[SPARK-25897][K8S] Hook up k8s integration tests to sbt build.

The integration tests can now be run in sbt if the right profile
is enabled, using the "test" task under the respective project.

This avoids having to fall back to maven to run the tests, which
invalidates all your compiled stuff when you go back to sbt, making
development way slower than it should.

There's also a task to run the tests directly without refreshing
the docker images, which is helpful if you just made a change to
the submission code which should not affect the code in the images.

The sbt tasks currently are not very customizable; there's some
very minor things you can set in the sbt shell itself, but otherwise
it's hardcoded to run on minikube.

I also had to make some slight adjustments to the IT code itself,
mostly to remove assumptions about the existing harness.

Tested on sbt and maven.

Closes #22909 from vanzin/SPARK-25897.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Marcelo Vanzin 2018-11-07 13:19:31 -08:00
parent 0a32238d03
commit e4561e1c55
11 changed files with 125 additions and 82 deletions

View file

@ -197,6 +197,9 @@ do
if ! which minikube 1>/dev/null; then
error "Cannot find minikube."
fi
if ! minikube status 1>/dev/null; then
error "Cannot contact minikube. Make sure it's running."
fi
eval $(minikube docker-env)
;;
esac

View file

@ -374,6 +374,8 @@ object SparkBuild extends PomBuild {
// SPARK-14738 - Remove docker tests from main Spark build
// enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests)
/**
* Adds the ability to run the spark shell directly from SBT without building an assembly
* jar.
@ -458,6 +460,65 @@ object DockerIntegrationTests {
)
}
/**
* These settings run a hardcoded configuration of the Kubernetes integration tests using
* minikube. Docker images will have the "dev" tag, and will be overwritten every time the
* integration tests are run. The integration tests are actually bound to the "test" phase,
* so running "test" on this module will run the integration tests.
*
* There are two ways to run the tests:
* - the "tests" task builds docker images and runs the test, so it's a little slow.
* - the "run-its" task just runs the tests on a pre-built set of images.
*
* Note that this does not use the shell scripts that the maven build uses, which are more
* configurable. This is meant as a quick way for developers to run these tests against their
* local changes.
*/
object KubernetesIntegrationTests {
import BuildCommons._
val dockerBuild = TaskKey[Unit]("docker-imgs", "Build the docker images for ITs.")
val runITs = TaskKey[Unit]("run-its", "Only run ITs, skip image build.")
val imageTag = settingKey[String]("Tag to use for images built during the test.")
val namespace = settingKey[String]("Namespace where to run pods.")
// Hack: this variable is used to control whether to build docker images. It's updated by
// the tasks below in a non-obvious way, so that you get the functionality described in
// the scaladoc above.
private var shouldBuildImage = true
lazy val settings = Seq(
imageTag := "dev",
namespace := "default",
dockerBuild := {
if (shouldBuildImage) {
val dockerTool = s"$sparkHome/bin/docker-image-tool.sh"
val cmd = Seq(dockerTool, "-m", "-t", imageTag.value, "build")
val ec = Process(cmd).!
if (ec != 0) {
throw new IllegalStateException(s"Process '${cmd.mkString(" ")}' exited with $ec.")
}
}
shouldBuildImage = true
},
runITs := Def.taskDyn {
shouldBuildImage = false
Def.task {
(test in Test).value
}
}.value,
test in Test := (test in Test).dependsOn(dockerBuild).value,
javaOptions in Test ++= Seq(
"-Dspark.kubernetes.test.deployMode=minikube",
s"-Dspark.kubernetes.test.imageTag=${imageTag.value}",
s"-Dspark.kubernetes.test.namespace=${namespace.value}",
s"-Dspark.kubernetes.test.unpackSparkDir=$sparkHome"
),
// Force packaging before building images, so that the latest code is tested.
dockerBuild := dockerBuild.dependsOn(packageBin in Compile in assembly).value
)
}
/**
* Overrides to work around sbt's dependency resolution being different from Maven's.
*/

View file

@ -155,14 +155,10 @@
<executions>
<execution>
<id>test</id>
<phase>none</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<!-- The negative pattern below prevents integration tests such as
KubernetesSuite from running in the test phase. -->
<suffixes>(?&lt;!Suite)</suffixes>
</configuration>
</execution>
<execution>
<id>integration-test</id>

View file

@ -19,9 +19,11 @@ package org.apache.spark.deploy.k8s.integrationtest
import java.io.File
import java.nio.file.{Path, Paths}
import java.util.UUID
import java.util.regex.Pattern
import com.google.common.io.PatternFilenameFilter
import scala.collection.JavaConverters._
import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
@ -29,24 +31,22 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag}
import org.scalatest.Matchers
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Seconds, Span}
import scala.collection.JavaConverters._
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.integrationtest.TestConfig._
import org.apache.spark.{SPARK_VERSION, SparkFunSuite}
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}
import org.apache.spark.internal.Logging
private[spark] class KubernetesSuite extends SparkFunSuite
class KubernetesSuite extends SparkFunSuite
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite
with Logging with Eventually with Matchers {
import KubernetesSuite._
private var sparkHomeDir: Path = _
private var pyImage: String = _
private var rImage: String = _
protected var sparkHomeDir: Path = _
protected var pyImage: String = _
protected var rImage: String = _
protected var image: String = _
protected var testBackend: IntegrationTestBackend = _
@ -67,6 +67,30 @@ private[spark] class KubernetesSuite extends SparkFunSuite
private val extraExecTotalMemory =
s"${(1024 + memOverheadConstant*1024 + additionalMemory).toInt}Mi"
/**
* Build the image ref for the given image name, taking the repo and tag from the
* test configuration.
*/
private def testImageRef(name: String): String = {
val tag = sys.props.get(CONFIG_KEY_IMAGE_TAG_FILE)
.map { path =>
val tagFile = new File(path)
require(tagFile.isFile,
s"No file found for image tag at ${tagFile.getAbsolutePath}.")
Files.toString(tagFile, Charsets.UTF_8).trim
}
.orElse(sys.props.get(CONFIG_KEY_IMAGE_TAG))
.getOrElse {
throw new IllegalArgumentException(
s"One of $CONFIG_KEY_IMAGE_TAG_FILE or $CONFIG_KEY_IMAGE_TAG is required.")
}
val repo = sys.props.get(CONFIG_KEY_IMAGE_REPO)
.map { _ + "/" }
.getOrElse("")
s"$repo$name:$tag"
}
override def beforeAll(): Unit = {
super.beforeAll()
// The scalatest-maven-plugin gives system properties that are referenced but not set null
@ -83,17 +107,16 @@ private[spark] class KubernetesSuite extends SparkFunSuite
sparkHomeDir = Paths.get(sparkDirProp)
require(sparkHomeDir.toFile.isDirectory,
s"No directory found for spark home specified at $sparkHomeDir.")
val imageTag = getTestImageTag
val imageRepo = getTestImageRepo
image = s"$imageRepo/spark:$imageTag"
pyImage = s"$imageRepo/spark-py:$imageTag"
rImage = s"$imageRepo/spark-r:$imageTag"
image = testImageRef("spark")
pyImage = testImageRef("spark-py")
rImage = testImageRef("spark-r")
val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars"))
.toFile
.listFiles(new PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0)
containerLocalSparkDistroExamplesJar = s"local:///opt/spark/examples/jars/" +
s"${sparkDistroExamplesJarFile.getName}"
val scalaVersion = scala.util.Properties.versionNumberString
.split("\\.")
.take(2)
.mkString(".")
containerLocalSparkDistroExamplesJar =
s"local:///opt/spark/examples/jars/spark-examples_$scalaVersion-${SPARK_VERSION}.jar"
testBackend = IntegrationTestBackendFactory.getTestBackend
testBackend.initialize()
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)

View file

@ -28,7 +28,10 @@ object ProcessUtils extends Logging {
* executeProcess is used to run a command and return the output if it
* completes within timeout seconds.
*/
def executeProcess(fullCommand: Array[String], timeout: Long, dumpErrors: Boolean = false): Seq[String] = {
def executeProcess(
fullCommand: Array[String],
timeout: Long,
dumpErrors: Boolean = false): Seq[String] = {
val pb = new ProcessBuilder().command(fullCommand: _*)
pb.redirectErrorStream(true)
val proc = pb.start()
@ -41,7 +44,8 @@ object ProcessUtils extends Logging {
assert(proc.waitFor(timeout, TimeUnit.SECONDS),
s"Timed out while executing ${fullCommand.mkString(" ")}")
assert(proc.exitValue == 0,
s"Failed to execute ${fullCommand.mkString(" ")}${if (dumpErrors) "\n" + outputLines.mkString("\n")}")
s"Failed to execute ${fullCommand.mkString(" ")}" +
s"${if (dumpErrors) "\n" + outputLines.mkString("\n")}")
outputLines
}
}

View file

@ -16,18 +16,14 @@
*/
package org.apache.spark.deploy.k8s.integrationtest
import org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, getTestImageTag}
private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
import PythonTestsSuite._
import KubernetesSuite.k8sTestTag
private val pySparkDockerImage =
s"${getTestImageRepo}/spark-py:${getTestImageTag}"
test("Run PySpark on simple pi.py example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.container.image", pyImage)
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_PI,
mainClass = "",
@ -41,7 +37,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.container.image", pyImage)
.set("spark.kubernetes.pyspark.pythonVersion", "2")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
@ -59,7 +55,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.container.image", pyImage)
.set("spark.kubernetes.pyspark.pythonVersion", "3")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
@ -77,7 +73,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
test("Run PySpark with memory customization", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.container.image", pyImage)
.set("spark.kubernetes.pyspark.pythonVersion", "3")
.set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant")
.set("spark.executor.pyspark.memory", s"${additionalMemory}m")

View file

@ -16,16 +16,13 @@
*/
package org.apache.spark.deploy.k8s.integrationtest
import org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, getTestImageTag}
private[spark] trait RTestsSuite { k8sSuite: KubernetesSuite =>
import RTestsSuite._
import KubernetesSuite.k8sTestTag
test("Run SparkR on simple dataframe.R example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-r:${getTestImageTag}")
sparkAppConf.set("spark.kubernetes.container.image", rImage)
runSparkApplicationAndVerifyCompletion(
appResource = SPARK_R_DATAFRAME_TEST,
mainClass = "",

View file

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.integrationtest
import java.io.File
import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
object TestConfig {
def getTestImageTag: String = {
val imageTagFileProp = System.getProperty(CONFIG_KEY_IMAGE_TAG_FILE)
require(imageTagFileProp != null, "Image tag file must be provided in system properties.")
val imageTagFile = new File(imageTagFileProp)
require(imageTagFile.isFile, s"No file found for image tag at ${imageTagFile.getAbsolutePath}.")
Files.toString(imageTagFile, Charsets.UTF_8).trim
}
def getTestImageRepo: String = {
val imageRepo = System.getProperty(CONFIG_KEY_IMAGE_REPO)
require(imageRepo != null, "Image repo must be provided in system properties.")
imageRepo
}
}

View file

@ -26,7 +26,7 @@ object TestConstants {
val CONFIG_KEY_KUBE_MASTER_URL = "spark.kubernetes.test.master"
val CONFIG_KEY_KUBE_NAMESPACE = "spark.kubernetes.test.namespace"
val CONFIG_KEY_KUBE_SVC_ACCOUNT = "spark.kubernetes.test.serviceAccountName"
val CONFIG_KEY_IMAGE_TAG = "spark.kubernetes.test.imageTagF"
val CONFIG_KEY_IMAGE_TAG = "spark.kubernetes.test.imageTag"
val CONFIG_KEY_IMAGE_TAG_FILE = "spark.kubernetes.test.imageTagFile"
val CONFIG_KEY_IMAGE_REPO = "spark.kubernetes.test.imageRepo"
val CONFIG_KEY_UNPACK_DIR = "spark.kubernetes.test.unpackSparkDir"

View file

@ -18,6 +18,7 @@
package org.apache.spark.deploy.k8s.integrationtest.backend
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.deploy.k8s.integrationtest.backend.cloud.KubeConfigBackend
import org.apache.spark.deploy.k8s.integrationtest.backend.docker.DockerForDesktopBackend
@ -35,7 +36,8 @@ private[spark] object IntegrationTestBackendFactory {
.getOrElse(BACKEND_MINIKUBE)
deployMode match {
case BACKEND_MINIKUBE => MinikubeTestBackend
case BACKEND_CLOUD => new KubeConfigBackend(System.getProperty(CONFIG_KEY_KUBE_CONFIG_CONTEXT))
case BACKEND_CLOUD =>
new KubeConfigBackend(System.getProperty(CONFIG_KEY_KUBE_CONFIG_CONTEXT))
case BACKEND_DOCKER_FOR_DESKTOP => DockerForDesktopBackend
case _ => throw new IllegalArgumentException("Invalid " +
CONFIG_KEY_DEPLOY_MODE + ": " + deployMode)

View file

@ -18,9 +18,10 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.cloud
import java.nio.file.Paths
import io.fabric8.kubernetes.client.utils.Utils
import io.fabric8.kubernetes.client.{Config, DefaultKubernetesClient}
import io.fabric8.kubernetes.client.utils.Utils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.deploy.k8s.integrationtest.TestConstants
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.internal.Logging
@ -38,7 +39,7 @@ private[spark] class KubeConfigBackend(var context: String)
// Auto-configure K8S client from K8S config file
if (Utils.getSystemPropertyOrEnvVar(Config.KUBERNETES_KUBECONFIG_FILE, null: String) == null) {
// Fabric 8 client will automatically assume a default location in this case
logWarning(s"No explicit KUBECONFIG specified, will assume .kube/config under your home directory")
logWarning("No explicit KUBECONFIG specified, will assume $HOME/.kube/config")
}
val config = Config.autoConfigure(context)