[SPARK-23153][K8S] Support client dependencies with a Hadoop Compatible File System

## What changes were proposed in this pull request?
- solves the current issue with --packages in cluster mode (there is no ticket for it). Also note of some [issues](https://issues.apache.org/jira/browse/SPARK-22657) of the past here when hadoop libs are used at the spark submit side.
- supports spark.jars, spark.files, app jar.

It works as follows:
Spark submit uploads the deps to the HCFS. Then the driver serves the deps via the Spark file server.
No hcfs uris are propagated.

The related design document is [here](https://docs.google.com/document/d/1peg_qVhLaAl4weo5C51jQicPwLclApBsdR1To2fgc48/edit). the next option to add is the RSS but has to be improved given the discussion in the past about it (Spark 2.3).
## How was this patch tested?

- Run integration test suite.
- Run an example using S3:

```
 ./bin/spark-submit \
...
 --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6 \
 --deploy-mode cluster \
 --name spark-pi \
 --class org.apache.spark.examples.SparkPi \
 --conf spark.executor.memory=1G \
 --conf spark.kubernetes.namespace=spark \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
 --conf spark.driver.memory=1G \
 --conf spark.executor.instances=2 \
 --conf spark.sql.streaming.metricsEnabled=true \
 --conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
 --conf spark.kubernetes.container.image.pullPolicy=Always \
 --conf spark.kubernetes.container.image=skonto/spark:k8s-3.0.0 \
 --conf spark.kubernetes.file.upload.path=s3a://fdp-stavros-test \
 --conf spark.hadoop.fs.s3a.access.key=... \
 --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
 --conf spark.hadoop.fs.s3a.fast.upload=true \
 --conf spark.kubernetes.executor.deleteOnTermination=false \
 --conf spark.hadoop.fs.s3a.secret.key=... \
 --conf spark.files=client:///...resolv.conf \
file:///my.jar **
```
Added integration tests based on [Ceph nano](https://github.com/ceph/cn). Looks very [active](http://www.sebastien-han.fr/blog/2019/02/24/Ceph-nano-is-getting-better-and-better/).
Unfortunately minio needs hadoop >= 2.8.

Closes #23546 from skonto/support-client-deps.

Authored-by: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>
Signed-off-by: Erik Erlandson <eerlands@redhat.com>
This commit is contained in:
Stavros Kontopoulos 2019-05-22 16:15:42 -07:00 committed by Erik Erlandson
parent 6c5827c723
commit 5e74570c8f
14 changed files with 545 additions and 75 deletions

View file

@ -29,6 +29,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.{Properties, Try}
import org.apache.commons.io.FilenameUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
@ -222,7 +223,7 @@ private[spark] class SparkSubmit extends Logging {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = new SparkConf()
val sparkConf = args.toSparkConf()
var childMainClass = ""
// Set the cluster manager
@ -313,6 +314,9 @@ private[spark] class SparkSubmit extends Logging {
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
val isKubernetesClusterModeDriver = isKubernetesClient &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
val isMesosClient = clusterManager == MESOS && deployMode == CLIENT
if (!isMesosCluster && !isStandAloneCluster) {
@ -323,9 +327,25 @@ private[spark] class SparkSubmit extends Logging {
args.ivySettingsPath)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
// In K8s client mode, when in the driver, add resolved jars early as we might need
// them at the submit time for artifact downloading.
// For example we might use the dependencies for downloading
// files from a Hadoop Compatible fs eg. S3. In this case the user might pass:
// --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
if (isKubernetesClusterModeDriver) {
val loader = getSubmitClassLoader(sparkConf)
for (jar <- resolvedMavenCoordinates.split(",")) {
addJarToClasspath(jar, loader)
}
} else if (isKubernetesCluster) {
// We need this in K8s cluster mode so that we can upload local deps
// via the k8s application, like in cluster mode driver
childClasspath ++= resolvedMavenCoordinates.split(",")
} else {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
}
}
}
@ -380,6 +400,17 @@ private[spark] class SparkSubmit extends Logging {
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
if (isKubernetesClusterModeDriver) {
// Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
// Executors will get the jars from the Spark file server.
// Explicitly download the related files here
args.jars = renameResourcesToLocalFS(args.jars, localJars)
val localFiles = Option(args.files).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
args.files = renameResourcesToLocalFS(args.files, localFiles)
}
}
// When running in YARN, for some remote resources with scheme:
@ -535,11 +566,13 @@ private[spark] class SparkSubmit extends Logging {
OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),
// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
confKey = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.packages"),
OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.repositories"),
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.ivy"),
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = "spark.jars.excludes"),
// Yarn only
@ -777,6 +810,21 @@ private[spark] class SparkSubmit extends Logging {
(childArgs, childClasspath, sparkConf, childMainClass)
}
private def renameResourcesToLocalFS(resources: String, localResources: String): String = {
if (resources != null && localResources != null) {
val localResourcesSeq = Utils.stringToSeq(localResources)
Utils.stringToSeq(resources).map { resource =>
val filenameRemote = FilenameUtils.getName(new URI(resource).getPath)
localResourcesSeq.find { localUri =>
val filenameLocal = FilenameUtils.getName(new URI(localUri).getPath)
filenameRemote == filenameLocal
}.getOrElse(resource)
}.mkString(",")
} else {
resources
}
}
// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes
// mode, we must trick it into thinking we're YARN.
@ -787,6 +835,19 @@ private[spark] class SparkSubmit extends Logging {
sparkConf.set(key, shortUserName)
}
private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = {
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
loader
}
/**
* Run the main method of the child class using the submit arguments.
*
@ -814,17 +875,7 @@ private[spark] class SparkSubmit extends Logging {
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo("\n")
}
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}

View file

@ -1325,12 +1325,12 @@ class SparkSubmitSuite
"--class", "Foo",
"app.jar")
val conf = new SparkSubmitArguments(clArgs).toSparkConf()
Seq(
testConf,
masterConf
).foreach { case (k, v) =>
conf.get(k) should be (v)
}
Seq(
testConf,
masterConf
).foreach { case (k, v) =>
conf.get(k) should be (v)
}
}
}

View file

@ -208,8 +208,31 @@ If your application's dependencies are all hosted in remote locations like HDFS
by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images.
Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the
`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles. The `local://` scheme is also required when referring to
dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies from the submission
client's local file system is currently not yet supported.
dependencies in custom-built Docker images in `spark-submit`. We support dependencies from the submission
client's local file system using the `file://` scheme or without a scheme (using a full path), where the destination should be a Hadoop compatible filesystem.
A typical example of this using S3 is via passing the following options:
```
...
--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6
--conf spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path
--conf spark.hadoop.fs.s3a.access.key=...
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
--conf spark.hadoop.fs.s3a.fast.upload=true
--conf spark.hadoop.fs.s3a.secret.key=....
--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp
file:///full/path/to/app.jar
```
The app jar file will be uploaded to the S3 and then when the driver is launched it will be downloaded
to the driver pod and will be added to its classpath. Spark will generate a subdir under the upload path with a random name
to avoid conflicts with spark apps running in parallel. User could manage the subdirs created according to his needs.
The client scheme is supported for the application jar, and dependencies specified by properties `spark.jars` and `spark.files`.
Important: all client-side dependencies will be uploaded to the given path with a flat directory structure so
file names must be unique otherwise files will be overwritten. Also make sure in the derived k8s image default ivy dir
has the required access rights or modify the settings as above. The latter is also important if you use `--packages` in
cluster mode.
## Secret Management
Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a
@ -455,7 +478,6 @@ There are several Spark on Kubernetes features that are currently being worked o
Some of these include:
* Dynamic Resource Allocation and External Shuffle Service
* Local File Dependency Management
* Job Queues and Resource Management
# Configuration
@ -1078,6 +1100,15 @@ See the [configuration page](configuration.html) for information on Spark config
Specify the grace period in seconds when deleting a Spark application using spark-submit.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.file.upload.path</code></td>
<td>(none)</td>
<td>
Path to store files at the spark submit side in cluster mode. For example:
<code>spark.kubernetes.file.upload.path=s3a://<s3-bucket>/path</code>
File should specified as <code>file://path/to/file </code> or absolute path.
</td>
</tr>
</table>
#### Pod template properties

View file

@ -338,6 +338,13 @@ private[spark] object Config extends Logging {
.timeConf(TimeUnit.SECONDS)
.createOptional
val KUBERNETES_FILE_UPLOAD_PATH =
ConfigBuilder("spark.kubernetes.file.upload.path")
.doc("Hadoop compatible file system path where files from the local file system " +
"will be uploded to in cluster mode.")
.stringConf
.createOptional
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."

View file

@ -16,18 +16,25 @@
*/
package org.apache.spark.deploy.k8s
import java.io.File
import java.io.{File, IOException}
import java.net.URI
import java.security.SecureRandom
import java.util.UUID
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.Utils.getHadoopFileSystem
private[spark] object KubernetesUtils extends Logging {
@ -209,4 +216,77 @@ private[spark] object KubernetesUtils extends Logging {
Hex.encodeHexString(random) + time
}
/**
* Upload files and modify their uris
*/
def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None)
: Iterable[String] = {
fileUris.map { uri =>
uploadFileUri(uri, conf)
}
}
private def isLocalDependency(uri: URI): Boolean = {
uri.getScheme match {
case null | "file" => true
case _ => false
}
}
def isLocalAndResolvable(resource: String): Boolean = {
resource != SparkLauncher.NO_RESOURCE &&
isLocalDependency(Utils.resolveURI(resource))
}
def renameMainAppResource(resource: String, conf: SparkConf): String = {
if (isLocalAndResolvable(resource)) {
SparkLauncher.NO_RESOURCE
} else {
resource
}
}
def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
conf match {
case Some(sConf) =>
if (sConf.get(KUBERNETES_FILE_UPLOAD_PATH).isDefined) {
val fileUri = Utils.resolveURI(uri)
try {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sConf)
val uploadPath = sConf.get(KUBERNETES_FILE_UPLOAD_PATH).get
val fs = getHadoopFileSystem(Utils.resolveURI(uploadPath), hadoopConf)
val randomDirName = s"spark-upload-${UUID.randomUUID()}"
fs.mkdirs(new Path(s"${uploadPath}/${randomDirName}"))
val targetUri = s"${uploadPath}/${randomDirName}/${fileUri.getPath.split("/").last}"
log.info(s"Uploading file: ${fileUri.getPath} to dest: $targetUri...")
uploadFileToHadoopCompatibleFS(new Path(fileUri.getPath), new Path(targetUri), fs)
targetUri
} catch {
case e: Exception =>
throw new SparkException(s"Uploading file ${fileUri.getPath} failed...", e)
}
} else {
throw new SparkException("Please specify " +
"spark.kubernetes.file.upload.path property.")
}
case _ => throw new SparkException("Spark configuration is missing...")
}
}
/**
* Upload a file to a Hadoop-compatible filesystem.
*/
private def uploadFileToHadoopCompatibleFS(
src: Path,
dest: Path,
fs: FileSystem,
delSrc : Boolean = false,
overwrite: Boolean = true): Unit = {
try {
fs.copyFromLocalFile(false, true, src, dest)
} catch {
case e: IOException =>
throw new SparkException(s"Error uploading file ${src.getName}", e)
}
}
}

View file

@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
@ -156,6 +155,15 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix,
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
// try upload local, resolvable files to a hadoop compatible file system
Seq(JARS, FILES).foreach { key =>
val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf))
if (resolved.nonEmpty) {
additionalProps.put(key.key, resolved.mkString(","))
}
}
additionalProps.toMap
}
}

View file

@ -24,9 +24,7 @@ import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
/**
* Creates the driver command for running the user app, and propagates needed configuration so
@ -88,11 +86,17 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
}
private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = {
// re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit
val resolvedResource = if (conf.mainAppResource.isInstanceOf[JavaMainAppResource]) {
KubernetesUtils.renameMainAppResource(resource, conf.sparkConf)
} else {
resource
}
new ContainerBuilder(pod.container)
.addToArgs("driver")
.addToArgs("--properties-file", SPARK_CONF_PATH)
.addToArgs("--class", conf.mainClass)
.addToArgs(resource)
.addToArgs(resolvedResource)
.addToArgs(conf.appArgs: _*)
}
}

View file

@ -76,6 +76,12 @@
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.7.4</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View file

@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.integrationtest
import java.net.URL
import scala.collection.JavaConverters._
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.s3.AmazonS3Client
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Span}
import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT, FILE_CONTENTS, HOST_PATH}
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, MinikubeTag, TIMEOUT}
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
import KubernetesSuite.k8sTestTag
val cName = "ceph-nano"
val svcName = s"$cName-s3"
val bucket = "spark"
private def getCephContainer(): Container = {
val envVars = Map ( "NETWORK_AUTO_DETECT" -> "4",
"RGW_CIVETWEB_PORT" -> "8000",
"SREE_PORT" -> "5001",
"CEPH_DEMO_UID" -> "nano",
"CEPH_DAEMON" -> "demo",
"DEBUG" -> "verbose"
).map( envV =>
new EnvVarBuilder()
.withName(envV._1)
.withValue(envV._2)
.build()
).toArray
val resources = Map(
"cpu" -> new QuantityBuilder()
.withAmount("1")
.build(),
"memory" -> new QuantityBuilder()
.withAmount("512M")
.build()
).asJava
new ContainerBuilder()
.withImage("ceph/daemon:v4.0.0-stable-4.0-master-centos-7-x86_64")
.withImagePullPolicy("Always")
.withName(cName)
.withPorts(new ContainerPortBuilder()
.withName(svcName)
.withProtocol("TCP")
.withContainerPort(8000)
.build()
)
.withResources(new ResourceRequirementsBuilder()
.withLimits(resources)
.withRequests(resources)
.build()
)
.withEnv(envVars: _*)
.build()
}
// Based on https://github.com/ceph/cn
private def setupCephStorage(): Unit = {
val labels = Map("app" -> "ceph", "daemon" -> "nano").asJava
val cephService = new ServiceBuilder()
.withNewMetadata()
.withName(svcName)
.withLabels(labels)
.endMetadata()
.withNewSpec()
.withPorts(new ServicePortBuilder()
.withName("https")
.withPort(8000)
.withProtocol("TCP")
.withTargetPort(new IntOrString(8000))
.build()
)
.withType("NodePort")
.withSelector(labels)
.endSpec()
.build()
val cephStatefulSet = new StatefulSetBuilder()
.withNewMetadata()
.withName(cName)
.withLabels(labels)
.endMetadata()
.withNewSpec()
.withReplicas(1)
.withNewSelector()
.withMatchLabels(Map("app" -> "ceph").asJava)
.endSelector()
.withServiceName(cName)
.withNewTemplate()
.withNewMetadata()
.withName(cName)
.withLabels(labels)
.endMetadata()
.withNewSpec()
.withContainers(getCephContainer())
.endSpec()
.endTemplate()
.endSpec()
.build()
kubernetesTestComponents
.kubernetesClient
.services()
.create(cephService)
kubernetesTestComponents
.kubernetesClient
.apps()
.statefulSets()
.create(cephStatefulSet)
}
private def deleteCephStorage(): Unit = {
kubernetesTestComponents
.kubernetesClient
.apps()
.statefulSets()
.withName(cName)
.delete()
kubernetesTestComponents
.kubernetesClient
.services()
.withName(svcName)
.delete()
}
test("Launcher client dependencies", k8sTestTag, MinikubeTag) {
val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
try {
setupCephStorage()
val cephUrlStr = getServiceUrl(svcName)
val cephUrl = new URL(cephUrlStr)
val cephHost = cephUrl.getHost
val cephPort = cephUrl.getPort
val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir)
val (accessKey, secretKey) = getCephCredentials()
sparkAppConf
.set("spark.hadoop.fs.s3a.access.key", accessKey)
.set("spark.hadoop.fs.s3a.secret.key", secretKey)
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.set("spark.hadoop.fs.s3a.endpoint", s"$cephHost:$cephPort")
.set("spark.kubernetes.file.upload.path", s"s3a://$bucket")
.set("spark.files", s"$HOST_PATH/$fileName")
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" +
"1.7.4,org.apache.hadoop:hadoop-aws:2.7.6")
.set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
createS3Bucket(accessKey, secretKey, cephUrlStr)
runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar,
appArgs = Array(fileName),
timeout = Option(DEPS_TIMEOUT))
} finally {
// make sure this always runs
deleteCephStorage()
}
}
// There isn't a cleaner way to get the credentials
// when ceph-nano runs on k8s
private def getCephCredentials(): (String, String) = {
Eventually.eventually(TIMEOUT, INTERVAL) {
val cephPod = kubernetesTestComponents
.kubernetesClient
.pods()
.withName(s"$cName-0")
.get()
implicit val podName: String = cephPod.getMetadata.getName
implicit val components: KubernetesTestComponents = kubernetesTestComponents
val contents = Utils.executeCommand("cat", "/nano_user_details")
(extractS3Key(contents, "access_key"), extractS3Key(contents, "secret_key"))
}
}
private def extractS3Key(data: String, key: String): String = {
data.split("\n")
.filter(_.contains(key))
.head
.split(":")
.last
.trim
.replaceAll("[,|\"]", "")
}
private def createS3Bucket(accessKey: String, secretKey: String, endPoint: String): Unit = {
Eventually.eventually(TIMEOUT, INTERVAL) {
try {
val credentials = new BasicAWSCredentials(accessKey, secretKey)
val s3client = new AmazonS3Client(credentials)
s3client.setEndpoint(endPoint)
s3client.createBucket(bucket)
} catch {
case e: Exception =>
throw new SparkException(s"Failed to create bucket $bucket.", e)
}
}
}
private def getServiceUrl(serviceName: String): String = {
Eventually.eventually(TIMEOUT, INTERVAL) {
// ns is always available either random or provided by the user
Minikube.minikubeServiceAction(serviceName, "-n", kubernetesTestComponents.namespace, "--url")
}
}
}
private[spark] object DepsTestsSuite {
val HOST_PATH = "/tmp"
val FILE_CONTENTS = "test deps"
// increase the default because jar resolution takes time in the container
val DEPS_TIMEOUT = PatienceConfiguration.Timeout(Span(4, Minutes))
}

View file

@ -30,9 +30,10 @@ import io.fabric8.kubernetes.client.Watcher.Action
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag}
import org.scalatest.Matchers
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.concurrent.PatienceConfiguration.{Interval, Timeout}
import org.scalatest.time.{Minutes, Seconds, Span}
import org.apache.spark.{SPARK_VERSION, SparkFunSuite}
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}
import org.apache.spark.internal.Logging
@ -41,7 +42,7 @@ import org.apache.spark.internal.config._
class KubernetesSuite extends SparkFunSuite
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite
with Logging with Eventually with Matchers {
with DepsTestsSuite with Logging with Eventually with Matchers {
import KubernetesSuite._
@ -120,12 +121,8 @@ class KubernetesSuite extends SparkFunSuite
pyImage = testImageRef(sys.props.getOrElse(CONFIG_KEY_IMAGE_PYTHON, "spark-py"))
rImage = testImageRef(sys.props.getOrElse(CONFIG_KEY_IMAGE_R, "spark-r"))
val scalaVersion = scala.util.Properties.versionNumberString
.split("\\.")
.take(2)
.mkString(".")
containerLocalSparkDistroExamplesJar =
s"local:///opt/spark/examples/jars/spark-examples_$scalaVersion-${SPARK_VERSION}.jar"
s"local:///opt/spark/examples/jars/${Utils.getExamplesJarName()}"
testBackend = IntegrationTestBackendFactory.getTestBackend
testBackend.initialize()
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
@ -198,7 +195,7 @@ class KubernetesSuite extends SparkFunSuite
appLocator,
isJVM,
None,
interval)
Option((interval, None)))
}
protected def runSparkRemoteCheckAndVerifyCompletion(
@ -206,7 +203,8 @@ class KubernetesSuite extends SparkFunSuite
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
appArgs: Array[String],
appLocator: String = appLocator): Unit = {
appLocator: String = appLocator,
timeout: Option[PatienceConfiguration.Timeout] = None): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
SPARK_REMOTE_MAIN_CLASS,
@ -215,7 +213,8 @@ class KubernetesSuite extends SparkFunSuite
driverPodChecker,
executorPodChecker,
appLocator,
true)
true,
executorPatience = Option((None, timeout)))
}
protected def runSparkJVMCheckAndVerifyCompletion(
@ -265,7 +264,7 @@ class KubernetesSuite extends SparkFunSuite
appLocator: String,
isJVM: Boolean,
pyFiles: Option[String] = None,
interval: Option[PatienceConfiguration.Interval] = None): Unit = {
executorPatience: Option[(Option[Interval], Option[Timeout])] = None): Unit = {
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
@ -306,8 +305,16 @@ class KubernetesSuite extends SparkFunSuite
}
})
val patienceInterval = interval.getOrElse(INTERVAL)
Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) }
val (patienceInterval, patienceTimeout) = {
executorPatience match {
case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT))
case _ => (INTERVAL, TIMEOUT)
}
}
Eventually.eventually(patienceTimeout, patienceInterval) {
execPods.values.nonEmpty should be (true)
}
execWatcher.close()
execPods.values.foreach(executorPodChecker(_))
Eventually.eventually(TIMEOUT, patienceInterval) {
@ -408,6 +415,7 @@ class KubernetesSuite extends SparkFunSuite
private[spark] object KubernetesSuite {
val k8sTestTag = Tag("k8s")
val MinikubeTag = Tag("minikube")
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest"
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"

View file

@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.JARS
@ -93,6 +94,8 @@ private[spark] class SparkAppConf {
override def toString: String = map.toString
def toStringArray: Iterable[String] = map.toList.flatMap(t => List("--conf", s"${t._1}=${t._2}"))
def toSparkConf: SparkConf = new SparkConf().setAll(map)
}
private[spark] case class SparkAppArguments(

View file

@ -16,13 +16,10 @@
*/
package org.apache.spark.deploy.k8s.integrationtest
import java.io.{File, PrintWriter}
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.storage.StorageClassBuilder
import org.scalatest.Tag
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Milliseconds, Span}
@ -125,25 +122,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
}
}
private def createTempFile(): String = {
val filename = try {
val f = File.createTempFile("tmp", ".txt", new File(HOST_PATH))
f.deleteOnExit()
new PrintWriter(f) {
try {
write(FILE_CONTENTS)
} finally {
close()
}
}
f.getName
} catch {
case e: Exception => e.printStackTrace(); throw e;
}
filename
}
test("Test PVs with local storage", k8sTestTag, MinikubeTag) {
test("PVs with local storage", k8sTestTag, MinikubeTag) {
sparkAppConf
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path",
CONTAINER_MOUNT_PATH)
@ -153,7 +132,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
CONTAINER_MOUNT_PATH)
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName",
PVC_NAME)
val file = createTempFile()
val file = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
try {
setupLocalStorage()
runDFSReadWriteAndVerifyCompletion(
@ -170,14 +149,13 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
interval = Some(PV_TESTS_INTERVAL)
)
} finally {
// make sure this always run
// make sure this always runs
deleteLocalStorage()
}
}
}
private[spark] object PVTestsSuite {
val MinikubeTag = Tag("minikube")
val STORAGE_NAME = "test-local-storage"
val PV_NAME = "test-local-pv"
val PVC_NAME = "test-local-pvc"

View file

@ -16,15 +16,26 @@
*/
package org.apache.spark.deploy.k8s.integrationtest
import java.io.Closeable
import java.net.URI
import java.io.{Closeable, File, PrintWriter}
import java.nio.file.{Files, Path}
import scala.collection.JavaConverters._
import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.spark.{SPARK_VERSION, SparkException}
import org.apache.spark.internal.Logging
object Utils extends Logging {
def getExamplesJarName(): String = {
val scalaVersion = scala.util.Properties.versionNumberString
.split("\\.")
.take(2)
.mkString(".")
s"spark-examples_$scalaVersion-${SPARK_VERSION}.jar"
}
def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
val resource = createResource
try f.apply(resource) finally resource.close()
@ -49,4 +60,43 @@ object Utils extends Logging {
out.flush()
out.toString()
}
def createTempFile(contents: String, hostPath: String): String = {
val filename = try {
val f = File.createTempFile("tmp", ".txt", new File(hostPath))
f.deleteOnExit()
new PrintWriter(f) {
try {
write(contents)
} finally {
close()
}
}
f.getName
} catch {
case e: Exception => e.printStackTrace(); throw e;
}
filename
}
def getExamplesJarAbsolutePath(sparkHomeDir: Path): String = {
val jarName = getExamplesJarName()
val jarPathsFound = Files
.walk(sparkHomeDir)
.filter(Files.isRegularFile(_))
.filter((f: Path) => {f.toFile.getName == jarName})
// we should not have more than one here under current test build dir
// we only need one though
val jarPath = jarPathsFound
.iterator()
.asScala
.map(_.toAbsolutePath.toString)
.toArray
.headOption
jarPath match {
case Some(jar) => jar
case _ => throw new SparkException(s"No valid $jarName file was found " +
s"under spark home test dir ${sparkHomeDir.toAbsolutePath}!")
}
}
}

View file

@ -112,7 +112,12 @@ private[spark] object Minikube extends Logging {
private def executeMinikube(action: String, args: String*): Seq[String] = {
ProcessUtils.executeProcess(
Array("bash", "-c", s"minikube $action") ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
Array("bash", "-c", s"minikube $action ${args.mkString(" ")}"),
MINIKUBE_STARTUP_TIMEOUT_SECONDS)
}
def minikubeServiceAction(args: String*): String = {
executeMinikube("service", args: _*).head
}
}