diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 59b638b819..926e2dfd34 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -29,6 +29,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import scala.util.{Properties, Try} +import org.apache.commons.io.FilenameUtils import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} import org.apache.hadoop.fs.{FileSystem, Path} @@ -222,7 +223,7 @@ private[spark] class SparkSubmit extends Logging { // Return values val childArgs = new ArrayBuffer[String]() val childClasspath = new ArrayBuffer[String]() - val sparkConf = new SparkConf() + val sparkConf = args.toSparkConf() var childMainClass = "" // Set the cluster manager @@ -313,6 +314,9 @@ private[spark] class SparkSubmit extends Logging { val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER + val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT + val isKubernetesClusterModeDriver = isKubernetesClient && + sparkConf.getBoolean("spark.kubernetes.submitInDriver", false) val isMesosClient = clusterManager == MESOS && deployMode == CLIENT if (!isMesosCluster && !isStandAloneCluster) { @@ -323,9 +327,25 @@ private[spark] class SparkSubmit extends Logging { args.ivySettingsPath) if (!StringUtils.isBlank(resolvedMavenCoordinates)) { - args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) - if (args.isPython || isInternal(args.primaryResource)) { - args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + // In K8s client mode, when in the driver, add resolved jars early as we might need + // them at the submit time for artifact downloading. + // For example we might use the dependencies for downloading + // files from a Hadoop Compatible fs eg. S3. In this case the user might pass: + // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 + if (isKubernetesClusterModeDriver) { + val loader = getSubmitClassLoader(sparkConf) + for (jar <- resolvedMavenCoordinates.split(",")) { + addJarToClasspath(jar, loader) + } + } else if (isKubernetesCluster) { + // We need this in K8s cluster mode so that we can upload local deps + // via the k8s application, like in cluster mode driver + childClasspath ++= resolvedMavenCoordinates.split(",") + } else { + args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) + if (args.isPython || isInternal(args.primaryResource)) { + args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) + } } } @@ -380,6 +400,17 @@ private[spark] class SparkSubmit extends Logging { localPyFiles = Option(args.pyFiles).map { downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull + + if (isKubernetesClusterModeDriver) { + // Replace with the downloaded local jar path to avoid propagating hadoop compatible uris. + // Executors will get the jars from the Spark file server. + // Explicitly download the related files here + args.jars = renameResourcesToLocalFS(args.jars, localJars) + val localFiles = Option(args.files).map { + downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) + }.orNull + args.files = renameResourcesToLocalFS(args.files, localFiles) + } } // When running in YARN, for some remote resources with scheme: @@ -535,11 +566,13 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key), // Propagate attributes for dependency resolution at the driver side - OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"), - OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER, - confKey = "spark.jars.repositories"), - OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"), - OptionAssigner(args.packagesExclusions, STANDALONE | MESOS, + OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES, + CLUSTER, confKey = "spark.jars.packages"), + OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES, + CLUSTER, confKey = "spark.jars.repositories"), + OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES, + CLUSTER, confKey = "spark.jars.ivy"), + OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.excludes"), // Yarn only @@ -777,6 +810,21 @@ private[spark] class SparkSubmit extends Logging { (childArgs, childClasspath, sparkConf, childMainClass) } + private def renameResourcesToLocalFS(resources: String, localResources: String): String = { + if (resources != null && localResources != null) { + val localResourcesSeq = Utils.stringToSeq(localResources) + Utils.stringToSeq(resources).map { resource => + val filenameRemote = FilenameUtils.getName(new URI(resource).getPath) + localResourcesSeq.find { localUri => + val filenameLocal = FilenameUtils.getName(new URI(localUri).getPath) + filenameRemote == filenameLocal + }.getOrElse(resource) + }.mkString(",") + } else { + resources + } + } + // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes // mode, we must trick it into thinking we're YARN. @@ -787,6 +835,19 @@ private[spark] class SparkSubmit extends Logging { sparkConf.set(key, shortUserName) } + private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = { + val loader = + if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) { + new ChildFirstURLClassLoader(new Array[URL](0), + Thread.currentThread.getContextClassLoader) + } else { + new MutableURLClassLoader(new Array[URL](0), + Thread.currentThread.getContextClassLoader) + } + Thread.currentThread.setContextClassLoader(loader) + loader + } + /** * Run the main method of the child class using the submit arguments. * @@ -814,17 +875,7 @@ private[spark] class SparkSubmit extends Logging { logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}") logInfo("\n") } - - val loader = - if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) { - new ChildFirstURLClassLoader(new Array[URL](0), - Thread.currentThread.getContextClassLoader) - } else { - new MutableURLClassLoader(new Array[URL](0), - Thread.currentThread.getContextClassLoader) - } - Thread.currentThread.setContextClassLoader(loader) - + val loader = getSubmitClassLoader(sparkConf) for (jar <- childClasspath) { addJarToClasspath(jar, loader) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index b0c187ddb1..65c9cb9ac5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1325,12 +1325,12 @@ class SparkSubmitSuite "--class", "Foo", "app.jar") val conf = new SparkSubmitArguments(clArgs).toSparkConf() - Seq( - testConf, - masterConf - ).foreach { case (k, v) => - conf.get(k) should be (v) - } + Seq( + testConf, + masterConf + ).foreach { case (k, v) => + conf.get(k) should be (v) + } } } diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 72833ccca6..8a424b57fe 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -208,8 +208,31 @@ If your application's dependencies are all hosted in remote locations like HDFS by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images. Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the `SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles. The `local://` scheme is also required when referring to -dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies from the submission -client's local file system is currently not yet supported. +dependencies in custom-built Docker images in `spark-submit`. We support dependencies from the submission +client's local file system using the `file://` scheme or without a scheme (using a full path), where the destination should be a Hadoop compatible filesystem. +A typical example of this using S3 is via passing the following options: + +``` +... +--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.6 +--conf spark.kubernetes.file.upload.path=s3a:///path +--conf spark.hadoop.fs.s3a.access.key=... +--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem +--conf spark.hadoop.fs.s3a.fast.upload=true +--conf spark.hadoop.fs.s3a.secret.key=.... +--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp +file:///full/path/to/app.jar +``` +The app jar file will be uploaded to the S3 and then when the driver is launched it will be downloaded +to the driver pod and will be added to its classpath. Spark will generate a subdir under the upload path with a random name +to avoid conflicts with spark apps running in parallel. User could manage the subdirs created according to his needs. + +The client scheme is supported for the application jar, and dependencies specified by properties `spark.jars` and `spark.files`. + +Important: all client-side dependencies will be uploaded to the given path with a flat directory structure so +file names must be unique otherwise files will be overwritten. Also make sure in the derived k8s image default ivy dir +has the required access rights or modify the settings as above. The latter is also important if you use `--packages` in +cluster mode. ## Secret Management Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a @@ -455,7 +478,6 @@ There are several Spark on Kubernetes features that are currently being worked o Some of these include: * Dynamic Resource Allocation and External Shuffle Service -* Local File Dependency Management * Job Queues and Resource Management # Configuration @@ -1078,6 +1100,15 @@ See the [configuration page](configuration.html) for information on Spark config Specify the grace period in seconds when deleting a Spark application using spark-submit. + + spark.kubernetes.file.upload.path + (none) + + Path to store files at the spark submit side in cluster mode. For example: + spark.kubernetes.file.upload.path=s3a:///path + File should specified as file://path/to/file or absolute path. + + #### Pod template properties diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index cc1bfd9bb0..7e99220d03 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -338,6 +338,13 @@ private[spark] object Config extends Logging { .timeConf(TimeUnit.SECONDS) .createOptional + val KUBERNETES_FILE_UPLOAD_PATH = + ConfigBuilder("spark.kubernetes.file.upload.path") + .doc("Hadoop compatible file system path where files from the local file system " + + "will be uploded to in cluster mode.") + .stringConf + .createOptional + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 3f7fcecb73..a5710357fd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,18 +16,25 @@ */ package org.apache.spark.deploy.k8s -import java.io.File +import java.io.{File, IOException} +import java.net.URI import java.security.SecureRandom +import java.util.UUID import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder} import io.fabric8.kubernetes.client.KubernetesClient import org.apache.commons.codec.binary.Hex +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH import org.apache.spark.internal.Logging +import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.{Clock, SystemClock, Utils} +import org.apache.spark.util.Utils.getHadoopFileSystem private[spark] object KubernetesUtils extends Logging { @@ -209,4 +216,77 @@ private[spark] object KubernetesUtils extends Logging { Hex.encodeHexString(random) + time } + /** + * Upload files and modify their uris + */ + def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None) + : Iterable[String] = { + fileUris.map { uri => + uploadFileUri(uri, conf) + } + } + + private def isLocalDependency(uri: URI): Boolean = { + uri.getScheme match { + case null | "file" => true + case _ => false + } + } + + def isLocalAndResolvable(resource: String): Boolean = { + resource != SparkLauncher.NO_RESOURCE && + isLocalDependency(Utils.resolveURI(resource)) + } + + def renameMainAppResource(resource: String, conf: SparkConf): String = { + if (isLocalAndResolvable(resource)) { + SparkLauncher.NO_RESOURCE + } else { + resource + } + } + + def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = { + conf match { + case Some(sConf) => + if (sConf.get(KUBERNETES_FILE_UPLOAD_PATH).isDefined) { + val fileUri = Utils.resolveURI(uri) + try { + val hadoopConf = SparkHadoopUtil.get.newConfiguration(sConf) + val uploadPath = sConf.get(KUBERNETES_FILE_UPLOAD_PATH).get + val fs = getHadoopFileSystem(Utils.resolveURI(uploadPath), hadoopConf) + val randomDirName = s"spark-upload-${UUID.randomUUID()}" + fs.mkdirs(new Path(s"${uploadPath}/${randomDirName}")) + val targetUri = s"${uploadPath}/${randomDirName}/${fileUri.getPath.split("/").last}" + log.info(s"Uploading file: ${fileUri.getPath} to dest: $targetUri...") + uploadFileToHadoopCompatibleFS(new Path(fileUri.getPath), new Path(targetUri), fs) + targetUri + } catch { + case e: Exception => + throw new SparkException(s"Uploading file ${fileUri.getPath} failed...", e) + } + } else { + throw new SparkException("Please specify " + + "spark.kubernetes.file.upload.path property.") + } + case _ => throw new SparkException("Spark configuration is missing...") + } + } + + /** + * Upload a file to a Hadoop-compatible filesystem. + */ + private def uploadFileToHadoopCompatibleFS( + src: Path, + dest: Path, + fs: FileSystem, + delSrc : Boolean = false, + overwrite: Boolean = true): Unit = { + try { + fs.copyFromLocalFile(false, true, src, dest) + } catch { + case e: IOException => + throw new SparkException(s"Error uploading file ${src.getName}", e) + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 61fc67f4dc..92463df0f6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ -import org.apache.spark.internal.config.UI._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils @@ -156,6 +155,15 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix, KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) + // try upload local, resolvable files to a hadoop compatible file system + Seq(JARS, FILES).foreach { key => + val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri)) + val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf)) + if (resolved.nonEmpty) { + additionalProps.put(key.key, resolved.mkString(",")) + } + } additionalProps.toMap } } + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala index 9c9cd1e1ac..7faf0d75bd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -24,9 +24,7 @@ import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ -import org.apache.spark.internal.config._ import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.util.Utils /** * Creates the driver command for running the user app, and propagates needed configuration so @@ -88,11 +86,17 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf) } private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = { + // re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit + val resolvedResource = if (conf.mainAppResource.isInstanceOf[JavaMainAppResource]) { + KubernetesUtils.renameMainAppResource(resource, conf.sparkConf) + } else { + resource + } new ContainerBuilder(pod.container) .addToArgs("driver") .addToArgs("--properties-file", SPARK_CONF_PATH) .addToArgs("--class", conf.mainClass) - .addToArgs(resource) + .addToArgs(resolvedResource) .addToArgs(conf.appArgs: _*) } } diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index 2248482558..d129ffb360 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -76,6 +76,12 @@ spark-tags_${scala.binary.version} test-jar + + com.amazonaws + aws-java-sdk + 1.7.4 + test + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala new file mode 100644 index 0000000000..b0c4182240 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.net.URL + +import scala.collection.JavaConverters._ + +import com.amazonaws.auth.BasicAWSCredentials +import com.amazonaws.services.s3.AmazonS3Client +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT, FILE_CONTENTS, HOST_PATH} +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, MinikubeTag, TIMEOUT} +import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube + +private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => + import KubernetesSuite.k8sTestTag + + val cName = "ceph-nano" + val svcName = s"$cName-s3" + val bucket = "spark" + + private def getCephContainer(): Container = { + val envVars = Map ( "NETWORK_AUTO_DETECT" -> "4", + "RGW_CIVETWEB_PORT" -> "8000", + "SREE_PORT" -> "5001", + "CEPH_DEMO_UID" -> "nano", + "CEPH_DAEMON" -> "demo", + "DEBUG" -> "verbose" + ).map( envV => + new EnvVarBuilder() + .withName(envV._1) + .withValue(envV._2) + .build() + ).toArray + + val resources = Map( + "cpu" -> new QuantityBuilder() + .withAmount("1") + .build(), + "memory" -> new QuantityBuilder() + .withAmount("512M") + .build() + ).asJava + + new ContainerBuilder() + .withImage("ceph/daemon:v4.0.0-stable-4.0-master-centos-7-x86_64") + .withImagePullPolicy("Always") + .withName(cName) + .withPorts(new ContainerPortBuilder() + .withName(svcName) + .withProtocol("TCP") + .withContainerPort(8000) + .build() + ) + .withResources(new ResourceRequirementsBuilder() + .withLimits(resources) + .withRequests(resources) + .build() + ) + .withEnv(envVars: _*) + .build() + } + + // Based on https://github.com/ceph/cn + private def setupCephStorage(): Unit = { + val labels = Map("app" -> "ceph", "daemon" -> "nano").asJava + val cephService = new ServiceBuilder() + .withNewMetadata() + .withName(svcName) + .withLabels(labels) + .endMetadata() + .withNewSpec() + .withPorts(new ServicePortBuilder() + .withName("https") + .withPort(8000) + .withProtocol("TCP") + .withTargetPort(new IntOrString(8000)) + .build() + ) + .withType("NodePort") + .withSelector(labels) + .endSpec() + .build() + + val cephStatefulSet = new StatefulSetBuilder() + .withNewMetadata() + .withName(cName) + .withLabels(labels) + .endMetadata() + .withNewSpec() + .withReplicas(1) + .withNewSelector() + .withMatchLabels(Map("app" -> "ceph").asJava) + .endSelector() + .withServiceName(cName) + .withNewTemplate() + .withNewMetadata() + .withName(cName) + .withLabels(labels) + .endMetadata() + .withNewSpec() + .withContainers(getCephContainer()) + .endSpec() + .endTemplate() + .endSpec() + .build() + + kubernetesTestComponents + .kubernetesClient + .services() + .create(cephService) + + kubernetesTestComponents + .kubernetesClient + .apps() + .statefulSets() + .create(cephStatefulSet) + } + + private def deleteCephStorage(): Unit = { + kubernetesTestComponents + .kubernetesClient + .apps() + .statefulSets() + .withName(cName) + .delete() + + kubernetesTestComponents + .kubernetesClient + .services() + .withName(svcName) + .delete() + } + + test("Launcher client dependencies", k8sTestTag, MinikubeTag) { + val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH) + try { + setupCephStorage() + val cephUrlStr = getServiceUrl(svcName) + val cephUrl = new URL(cephUrlStr) + val cephHost = cephUrl.getHost + val cephPort = cephUrl.getPort + val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir) + val (accessKey, secretKey) = getCephCredentials() + sparkAppConf + .set("spark.hadoop.fs.s3a.access.key", accessKey) + .set("spark.hadoop.fs.s3a.secret.key", secretKey) + .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") + .set("spark.hadoop.fs.s3a.endpoint", s"$cephHost:$cephPort") + .set("spark.kubernetes.file.upload.path", s"s3a://$bucket") + .set("spark.files", s"$HOST_PATH/$fileName") + .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" + + "1.7.4,org.apache.hadoop:hadoop-aws:2.7.6") + .set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp") + createS3Bucket(accessKey, secretKey, cephUrlStr) + runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar, + appArgs = Array(fileName), + timeout = Option(DEPS_TIMEOUT)) + } finally { + // make sure this always runs + deleteCephStorage() + } + } + + // There isn't a cleaner way to get the credentials + // when ceph-nano runs on k8s + private def getCephCredentials(): (String, String) = { + Eventually.eventually(TIMEOUT, INTERVAL) { + val cephPod = kubernetesTestComponents + .kubernetesClient + .pods() + .withName(s"$cName-0") + .get() + implicit val podName: String = cephPod.getMetadata.getName + implicit val components: KubernetesTestComponents = kubernetesTestComponents + val contents = Utils.executeCommand("cat", "/nano_user_details") + (extractS3Key(contents, "access_key"), extractS3Key(contents, "secret_key")) + } + } + + private def extractS3Key(data: String, key: String): String = { + data.split("\n") + .filter(_.contains(key)) + .head + .split(":") + .last + .trim + .replaceAll("[,|\"]", "") + } + + private def createS3Bucket(accessKey: String, secretKey: String, endPoint: String): Unit = { + Eventually.eventually(TIMEOUT, INTERVAL) { + try { + val credentials = new BasicAWSCredentials(accessKey, secretKey) + val s3client = new AmazonS3Client(credentials) + s3client.setEndpoint(endPoint) + s3client.createBucket(bucket) + } catch { + case e: Exception => + throw new SparkException(s"Failed to create bucket $bucket.", e) + } + } + } + + private def getServiceUrl(serviceName: String): String = { + Eventually.eventually(TIMEOUT, INTERVAL) { + // ns is always available either random or provided by the user + Minikube.minikubeServiceAction(serviceName, "-n", kubernetesTestComponents.namespace, "--url") + } + } +} + +private[spark] object DepsTestsSuite { + val HOST_PATH = "/tmp" + val FILE_CONTENTS = "test deps" + // increase the default because jar resolution takes time in the container + val DEPS_TIMEOUT = PatienceConfiguration.Timeout(Span(4, Minutes)) +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index bc0bb20908..51e758fe0d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -30,9 +30,10 @@ import io.fabric8.kubernetes.client.Watcher.Action import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag} import org.scalatest.Matchers import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.concurrent.PatienceConfiguration.{Interval, Timeout} import org.scalatest.time.{Minutes, Seconds, Span} -import org.apache.spark.{SPARK_VERSION, SparkFunSuite} +import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} import org.apache.spark.internal.Logging @@ -41,7 +42,7 @@ import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite - with Logging with Eventually with Matchers { + with DepsTestsSuite with Logging with Eventually with Matchers { import KubernetesSuite._ @@ -120,12 +121,8 @@ class KubernetesSuite extends SparkFunSuite pyImage = testImageRef(sys.props.getOrElse(CONFIG_KEY_IMAGE_PYTHON, "spark-py")) rImage = testImageRef(sys.props.getOrElse(CONFIG_KEY_IMAGE_R, "spark-r")) - val scalaVersion = scala.util.Properties.versionNumberString - .split("\\.") - .take(2) - .mkString(".") containerLocalSparkDistroExamplesJar = - s"local:///opt/spark/examples/jars/spark-examples_$scalaVersion-${SPARK_VERSION}.jar" + s"local:///opt/spark/examples/jars/${Utils.getExamplesJarName()}" testBackend = IntegrationTestBackendFactory.getTestBackend testBackend.initialize() kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) @@ -198,7 +195,7 @@ class KubernetesSuite extends SparkFunSuite appLocator, isJVM, None, - interval) + Option((interval, None))) } protected def runSparkRemoteCheckAndVerifyCompletion( @@ -206,7 +203,8 @@ class KubernetesSuite extends SparkFunSuite driverPodChecker: Pod => Unit = doBasicDriverPodCheck, executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, appArgs: Array[String], - appLocator: String = appLocator): Unit = { + appLocator: String = appLocator, + timeout: Option[PatienceConfiguration.Timeout] = None): Unit = { runSparkApplicationAndVerifyCompletion( appResource, SPARK_REMOTE_MAIN_CLASS, @@ -215,7 +213,8 @@ class KubernetesSuite extends SparkFunSuite driverPodChecker, executorPodChecker, appLocator, - true) + true, + executorPatience = Option((None, timeout))) } protected def runSparkJVMCheckAndVerifyCompletion( @@ -265,7 +264,7 @@ class KubernetesSuite extends SparkFunSuite appLocator: String, isJVM: Boolean, pyFiles: Option[String] = None, - interval: Option[PatienceConfiguration.Interval] = None): Unit = { + executorPatience: Option[(Option[Interval], Option[Timeout])] = None): Unit = { val appArguments = SparkAppArguments( mainAppResource = appResource, mainClass = mainClass, @@ -306,8 +305,16 @@ class KubernetesSuite extends SparkFunSuite } }) - val patienceInterval = interval.getOrElse(INTERVAL) - Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) } + val (patienceInterval, patienceTimeout) = { + executorPatience match { + case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT)) + case _ => (INTERVAL, TIMEOUT) + } + } + + Eventually.eventually(patienceTimeout, patienceInterval) { + execPods.values.nonEmpty should be (true) + } execWatcher.close() execPods.values.foreach(executorPodChecker(_)) Eventually.eventually(TIMEOUT, patienceInterval) { @@ -408,6 +415,7 @@ class KubernetesSuite extends SparkFunSuite private[spark] object KubernetesSuite { val k8sTestTag = Tag("k8s") + val MinikubeTag = Tag("minikube") val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest" val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index 50a7ef7d17..4cfda8ab9b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.scalatest.concurrent.Eventually +import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.JARS @@ -93,6 +94,8 @@ private[spark] class SparkAppConf { override def toString: String = map.toString def toStringArray: Iterable[String] = map.toList.flatMap(t => List("--conf", s"${t._1}=${t._2}")) + + def toSparkConf: SparkConf = new SparkConf().setAll(map) } private[spark] case class SparkAppArguments( diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala index d7a237f999..7776bc679d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala @@ -16,13 +16,10 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import java.io.{File, PrintWriter} - import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.api.model.storage.StorageClassBuilder -import org.scalatest.Tag import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Milliseconds, Span} @@ -125,25 +122,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite => } } - private def createTempFile(): String = { - val filename = try { - val f = File.createTempFile("tmp", ".txt", new File(HOST_PATH)) - f.deleteOnExit() - new PrintWriter(f) { - try { - write(FILE_CONTENTS) - } finally { - close() - } - } - f.getName - } catch { - case e: Exception => e.printStackTrace(); throw e; - } - filename - } - - test("Test PVs with local storage", k8sTestTag, MinikubeTag) { + test("PVs with local storage", k8sTestTag, MinikubeTag) { sparkAppConf .set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path", CONTAINER_MOUNT_PATH) @@ -153,7 +132,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite => CONTAINER_MOUNT_PATH) .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName", PVC_NAME) - val file = createTempFile() + val file = Utils.createTempFile(FILE_CONTENTS, HOST_PATH) try { setupLocalStorage() runDFSReadWriteAndVerifyCompletion( @@ -170,14 +149,13 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite => interval = Some(PV_TESTS_INTERVAL) ) } finally { - // make sure this always run + // make sure this always runs deleteLocalStorage() } } } private[spark] object PVTestsSuite { - val MinikubeTag = Tag("minikube") val STORAGE_NAME = "test-local-storage" val PV_NAME = "test-local-pv" val PVC_NAME = "test-local-pvc" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index d425f70718..a687a1bca1 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -16,15 +16,26 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import java.io.Closeable -import java.net.URI +import java.io.{Closeable, File, PrintWriter} +import java.nio.file.{Files, Path} + +import scala.collection.JavaConverters._ import org.apache.commons.io.output.ByteArrayOutputStream +import org.apache.spark.{SPARK_VERSION, SparkException} import org.apache.spark.internal.Logging object Utils extends Logging { + def getExamplesJarName(): String = { + val scalaVersion = scala.util.Properties.versionNumberString + .split("\\.") + .take(2) + .mkString(".") + s"spark-examples_$scalaVersion-${SPARK_VERSION}.jar" + } + def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { val resource = createResource try f.apply(resource) finally resource.close() @@ -49,4 +60,43 @@ object Utils extends Logging { out.flush() out.toString() } + + def createTempFile(contents: String, hostPath: String): String = { + val filename = try { + val f = File.createTempFile("tmp", ".txt", new File(hostPath)) + f.deleteOnExit() + new PrintWriter(f) { + try { + write(contents) + } finally { + close() + } + } + f.getName + } catch { + case e: Exception => e.printStackTrace(); throw e; + } + filename + } + + def getExamplesJarAbsolutePath(sparkHomeDir: Path): String = { + val jarName = getExamplesJarName() + val jarPathsFound = Files + .walk(sparkHomeDir) + .filter(Files.isRegularFile(_)) + .filter((f: Path) => {f.toFile.getName == jarName}) + // we should not have more than one here under current test build dir + // we only need one though + val jarPath = jarPathsFound + .iterator() + .asScala + .map(_.toAbsolutePath.toString) + .toArray + .headOption + jarPath match { + case Some(jar) => jar + case _ => throw new SparkException(s"No valid $jarName file was found " + + s"under spark home test dir ${sparkHomeDir.toAbsolutePath}!") + } + } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index 78ef44be7f..ce2ce1c61c 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -112,7 +112,12 @@ private[spark] object Minikube extends Logging { private def executeMinikube(action: String, args: String*): Seq[String] = { ProcessUtils.executeProcess( - Array("bash", "-c", s"minikube $action") ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS) + Array("bash", "-c", s"minikube $action ${args.mkString(" ")}"), + MINIKUBE_STARTUP_TIMEOUT_SECONDS) + } + + def minikubeServiceAction(args: String*): String = { + executeMinikube("service", args: _*).head } }