[SPARK-25887][K8S] Configurable K8S context support

This enhancement allows for specifying the desired context to use for the initial K8S client auto-configuration.  This allows users to more easily access alternative K8S contexts without having to first
explicitly change their current context via kubectl.

Explicitly set my K8S context to a context pointing to a non-existent cluster, then launched Spark jobs with explicitly specified contexts via the new `spark.kubernetes.context` configuration property.

Example Output:

```
> kubectl config current-context
minikube
> minikube status
minikube: Stopped
cluster:
kubectl:
> ./spark-submit --master k8s://https://localhost:6443 --deploy-mode cluster --name spark-pi --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=2 --conf spark.kubernetes.context=docker-for-desktop --conf spark.kubernetes.container.image=rvesse/spark:debian local:///opt/spark/examples/jars/spark-examples_2.11-3.0.0-SNAPSHOT.jar 4
18/10/31 11:57:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/10/31 11:57:51 INFO SparkKubernetesClientFactory: Auto-configuring K8S client using context docker-for-desktop from users K8S config file
18/10/31 11:57:52 INFO LoggingPodStatusWatcherImpl: State changed, new state:
	 pod name: spark-pi-1540987071845-driver
	 namespace: default
	 labels: spark-app-selector -> spark-2c4abc226ed3415986eb602bd13f3582, spark-role -> driver
	 pod uid: 32462cac-dd04-11e8-b6c6-025000000001
	 creation time: 2018-10-31T11:57:52Z
	 service account name: default
	 volumes: spark-local-dir-1, spark-conf-volume, default-token-glpfv
	 node name: N/A
	 start time: N/A
	 phase: Pending
	 container status: N/A
18/10/31 11:57:52 INFO LoggingPodStatusWatcherImpl: State changed, new state:
	 pod name: spark-pi-1540987071845-driver
	 namespace: default
	 labels: spark-app-selector -> spark-2c4abc226ed3415986eb602bd13f3582, spark-role -> driver
	 pod uid: 32462cac-dd04-11e8-b6c6-025000000001
	 creation time: 2018-10-31T11:57:52Z
	 service account name: default
	 volumes: spark-local-dir-1, spark-conf-volume, default-token-glpfv
	 node name: docker-for-desktop
	 start time: N/A
	 phase: Pending
	 container status: N/A
...
18/10/31 11:58:03 INFO LoggingPodStatusWatcherImpl: State changed, new state:
	 pod name: spark-pi-1540987071845-driver
	 namespace: default
	 labels: spark-app-selector -> spark-2c4abc226ed3415986eb602bd13f3582, spark-role -> driver
	 pod uid: 32462cac-dd04-11e8-b6c6-025000000001
	 creation time: 2018-10-31T11:57:52Z
	 service account name: default
	 volumes: spark-local-dir-1, spark-conf-volume, default-token-glpfv
	 node name: docker-for-desktop
	 start time: 2018-10-31T11:57:52Z
	 phase: Succeeded
	 container status:
		 container name: spark-kubernetes-driver
		 container image: rvesse/spark:debian
		 container state: terminated
		 container started at: 2018-10-31T11:57:54Z
		 container finished at: 2018-10-31T11:58:02Z
		 exit code: 0
		 termination reason: Completed
```

Without the `spark.kubernetes.context` setting this will fail because the current context - `minikube` - is pointing to a non-running cluster e.g.

```
> ./spark-submit --master k8s://https://localhost:6443 --deploy-mode cluster --name spark-pi --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=2 --conf spark.kubernetes.container.image=rvesse/spark:debian local:///opt/spark/examples/jars/spark-examples_2.11-3.0.0-SNAPSHOT.jar 4
18/10/31 12:02:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/10/31 12:02:30 INFO SparkKubernetesClientFactory: Auto-configuring K8S client using current context from users K8S config file
18/10/31 12:02:31 WARN WatchConnectionManager: Exec Failure
javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
	at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
	at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1949)
	at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)
	at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
	at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1509)
	at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
	at sun.security.ssl.Handshaker.processLoop(Handshaker.java:979)
	at sun.security.ssl.Handshaker.process_record(Handshaker.java:914)
	at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1062)
	at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375)
	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403)
	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387)
	at okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281)
	at okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251)
	at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151)
	at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195)
	at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
	at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
	at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:119)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:66)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at io.fabric8.kubernetes.client.utils.HttpClientUtils$2.intercept(HttpClientUtils.java:109)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
	at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
	at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
	at sun.security.validator.Validator.validate(Validator.java:260)
	at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324)
	at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:229)
	at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:124)
	at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1491)
	... 39 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
	at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
	at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
	at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
	at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:382)
	... 45 more
Exception in thread "kubernetes-dispatcher-0" Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask611a9c09 rejected from java.util.concurrent.ScheduledThreadPoolExecutor404819e4[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
	at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
	at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:632)
	at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678)
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.scheduleReconnect(WatchConnectionManager.java:300)
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.access$800(WatchConnectionManager.java:48)
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$2.onFailure(WatchConnectionManager.java:213)
	at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:543)
	at okhttp3.internal.ws.RealWebSocket$2.onFailure(RealWebSocket.java:208)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:148)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
io.fabric8.kubernetes.client.KubernetesClientException: Failed to start websocket
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$2.onFailure(WatchConnectionManager.java:204)
	at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:543)
	at okhttp3.internal.ws.RealWebSocket$2.onFailure(RealWebSocket.java:208)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:148)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
	at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
	at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1949)
	at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)
	at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
	at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1509)
	at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
	at sun.security.ssl.Handshaker.processLoop(Handshaker.java:979)
	at sun.security.ssl.Handshaker.process_record(Handshaker.java:914)
	at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1062)
	at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375)
	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403)
	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387)
	at okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281)
	at okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251)
	at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151)
	at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195)
	at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
	at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
	at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:119)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:66)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at io.fabric8.kubernetes.client.utils.HttpClientUtils$2.intercept(HttpClientUtils.java:109)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
	... 4 more
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
	at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
	at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
	at sun.security.validator.Validator.validate(Validator.java:260)
	at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324)
	at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:229)
	at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:124)
	at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1491)
	... 39 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
	at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
	at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
	at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
	at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:382)
	... 45 more
18/10/31 12:02:31 INFO ShutdownHookManager: Shutdown hook called
18/10/31 12:02:31 INFO ShutdownHookManager: Deleting directory /private/var/folders/6b/y1010qp107j9w2dhhy8csvz0000xq3/T/spark-5e649891-8a0f-4f17-bf3a-33b34082eba8
```

Suggested reviews: mccheah liyinan926 - this is the follow up fix to the bug discovered while working on SPARK-25809 (PR #22805)

Closes #22904 from rvesse/SPARK-25887.

Authored-by: Rob Vesse <rvesse@dotnetrdf.org>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Rob Vesse 2019-01-22 10:17:40 -08:00 committed by Marcelo Vanzin
parent 66450bbc1b
commit c542c247bb
3 changed files with 47 additions and 8 deletions

View file

@ -334,6 +334,16 @@ the Spark application.
## Kubernetes Features
### Configuration File
Your Kubernetes config file typically lives under `.kube/config` in your home directory or in a location specified by the `KUBECONFIG` environment variable. Spark on Kubernetes will attempt to use this file to do an initial auto-configuration of the Kubernetes client used to interact with the Kubernetes cluster. A variety of Spark configuration properties are provided that allow further customising the client configuration e.g. using an alternative authentication method.
### Contexts
Kubernetes configuration files can contain multiple contexts that allow for switching between different clusters and/or user identities. By default Spark on Kubernetes will use your current context (which can be checked by running `kubectl config current-context`) when doing the initial auto-configuration of the Kubernetes client.
In order to use an alternative context users can specify the desired context via the Spark configuration property `spark.kubernetes.context` e.g. `spark.kubernetes.context=minikube`.
### Namespaces
Kubernetes has the concept of [namespaces](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/).
@ -406,13 +416,23 @@ Some of these include:
# Configuration
See the [configuration page](configuration.html) for information on Spark configurations. The following configurations are
specific to Spark on Kubernetes.
See the [configuration page](configuration.html) for information on Spark configurations. The following configurations are specific to Spark on Kubernetes.
#### Spark Properties
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.kubernetes.context</code></td>
<td><code>(none)</code></td>
<td>
The context from the user Kubernetes configuration file used for the initial
auto-configuration of the Kubernetes client library. When not specified then
the users current context is used. <strong>NB:</strong> Many of the
auto-configured settings can be overridden by the use of other Spark
configuration properties e.g. <code>spark.kubernetes.namespace</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.namespace</code></td>
<td><code>default</code></td>

View file

@ -24,6 +24,18 @@ import org.apache.spark.internal.config.ConfigBuilder
private[spark] object Config extends Logging {
val KUBERNETES_CONTEXT =
ConfigBuilder("spark.kubernetes.context")
.doc("The desired context from your K8S config file used to configure the K8S " +
"client for interacting with the cluster. Useful if your config file has " +
"multiple clusters or user identities defined. The client library used " +
"locates the config file via the KUBECONFIG environment variable or by defaulting " +
"to .kube/config under your home directory. If not specified then your current " +
"context is used. You can always override specific aspects of the config file " +
"provided configuration using other Spark on K8S configuration options.")
.stringConf
.createOptional
val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
.doc("The namespace that will be used for running the driver and executor pods.")

View file

@ -21,11 +21,13 @@ import java.io.File
import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.Config.autoConfigure
import io.fabric8.kubernetes.client.utils.HttpClientUtils
import okhttp3.Dispatcher
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils
/**
@ -33,7 +35,7 @@ import org.apache.spark.util.ThreadUtils
* parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL
* options for different components.
*/
private[spark] object SparkKubernetesClientFactory {
private[spark] object SparkKubernetesClientFactory extends Logging {
def createKubernetesClient(
master: String,
@ -42,9 +44,6 @@ private[spark] object SparkKubernetesClientFactory {
sparkConf: SparkConf,
defaultServiceAccountToken: Option[File],
defaultServiceAccountCaCert: Option[File]): KubernetesClient = {
// TODO [SPARK-25887] Support configurable context
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
@ -67,8 +66,16 @@ private[spark] object SparkKubernetesClientFactory {
val dispatcher = new Dispatcher(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
// TODO [SPARK-25887] Create builder in a way that respects configurable context
val config = new ConfigBuilder()
// Allow for specifying a context used to auto-configure from the users K8S config file
val kubeContext = sparkConf.get(KUBERNETES_CONTEXT).filter(_.nonEmpty)
logInfo("Auto-configuring K8S client using " +
kubeContext.map("context " + _).getOrElse("current context") +
" from users K8S config file")
// Start from an auto-configured config with the desired context
// Fabric 8 uses null to indicate that the users current context should be used so if no
// explicit setting pass null
val config = new ConfigBuilder(autoConfigure(kubeContext.getOrElse(null)))
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)