[SPARK-27023][K8S] Make k8s client timeouts configurable

## What changes were proposed in this pull request?

Make k8s client timeouts configurable. No test suite exists for the client factory class, happy to add one if needed

Closes #23928 from onursatici/os/k8s-client-timeouts.

Lead-authored-by: Onur Satici <osatici@palantir.com>
Co-authored-by: Onur Satici <onursatici@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Onur Satici 2019-03-06 11:14:39 -08:00 committed by Dongjoon Hyun
parent cb20fbc43e
commit e9e8bb33ef
5 changed files with 74 additions and 0 deletions

View file

@ -990,6 +990,34 @@ See the [configuration page](configuration.html) for information on Spark config
Specify whether executor pods should be deleted in case of failure or normal termination.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.submission.connectionTimeout</code></td>
<td>10000</td>
<td>
Connection timeout in milliseconds for the kubernetes client to use for starting the driver.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.submission.requestTimeout</code></td>
<td>10000</td>
<td>
Request timeout in milliseconds for the kubernetes client to use for starting the driver.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.connectionTimeout</code></td>
<td>10000</td>
<td>
Connection timeout in milliseconds for the kubernetes client in driver to use when requesting executors.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.requestTimeout</code></td>
<td>10000</td>
<td>
Request timeout in milliseconds for the kubernetes client in driver to use when requesting executors.
</td>
</tr>
</table>
#### Pod template properties

View file

@ -86,6 +86,30 @@ private[spark] object Config extends Logging {
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
val SUBMISSION_CLIENT_REQUEST_TIMEOUT =
ConfigBuilder("spark.kubernetes.submission.requestTimeout")
.doc("request timeout to be used in milliseconds for starting the driver")
.intConf
.createWithDefault(10000)
val SUBMISSION_CLIENT_CONNECTION_TIMEOUT =
ConfigBuilder("spark.kubernetes.submission.connectionTimeout")
.doc("connection timeout to be used in milliseconds for starting the driver")
.intConf
.createWithDefault(10000)
val DRIVER_CLIENT_REQUEST_TIMEOUT =
ConfigBuilder("spark.kubernetes.driver.requestTimeout")
.doc("request timeout to be used in milliseconds for driver to request executors")
.intConf
.createWithDefault(10000)
val DRIVER_CLIENT_CONNECTION_TIMEOUT =
ConfigBuilder("spark.kubernetes.driver.connectionTimeout")
.doc("connection timeout to be used in milliseconds for driver to request executors")
.intConf
.createWithDefault(10000)
val KUBERNETES_SERVICE_ACCOUNT_NAME =
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses " +

View file

@ -28,6 +28,7 @@ import okhttp3.Dispatcher
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.util.ThreadUtils
/**
@ -41,6 +42,7 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
clientType: ClientType.Value,
sparkConf: SparkConf,
defaultServiceAccountToken: Option[File],
defaultServiceAccountCaCert: Option[File]): KubernetesClient = {
@ -79,6 +81,8 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
.withRequestTimeout(clientType.requestTimeout(sparkConf))
.withConnectionTimeout(clientType.connectionTimeout(sparkConf))
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
@ -111,4 +115,20 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
}.getOrElse(configBuilder)
}
}
object ClientType extends Enumeration {
import scala.language.implicitConversions
val Driver = Val(DRIVER_CLIENT_REQUEST_TIMEOUT, DRIVER_CLIENT_CONNECTION_TIMEOUT)
val Submission = Val(SUBMISSION_CLIENT_REQUEST_TIMEOUT, SUBMISSION_CLIENT_CONNECTION_TIMEOUT)
protected case class Val(
requestTimeoutEntry: ConfigEntry[Int],
connectionTimeoutEntry: ConfigEntry[Int])
extends super.Val {
def requestTimeout(conf: SparkConf): Int = conf.get(requestTimeoutEntry)
def connectionTimeout(conf: SparkConf): Int = conf.get(connectionTimeoutEntry)
}
implicit def convert(value: Value): Val = value.asInstanceOf[Val]
}
}

View file

@ -220,6 +220,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
master,
Some(kubernetesConf.namespace),
KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
SparkKubernetesClientFactory.ClientType.Submission,
sparkConf,
None,
None)) { kubernetesClient =>

View file

@ -65,6 +65,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
apiServerUri,
Some(sc.conf.get(KUBERNETES_NAMESPACE)),
authConfPrefix,
SparkKubernetesClientFactory.ClientType.Driver,
sc.conf,
defaultServiceAccountToken,
defaultServiceAccountCaCrt)