diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala index 367cff62cd..2d90c06e36 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala @@ -35,17 +35,16 @@ import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => import KubernetesSuite.k8sTestTag - val cName = "ceph-nano" + val cName = "minio" val svcName = s"$cName-s3" - val bucket = "spark" + val BUCKET = "spark" + val ACCESS_KEY = "minio" + val SECRET_KEY = "miniostorage" - private def getCephContainer(): Container = { - val envVars = Map ( "NETWORK_AUTO_DETECT" -> "4", - "RGW_FRONTEND_PORT" -> "8000", - "SREE_PORT" -> "5001", - "CEPH_DEMO_UID" -> "nano", - "CEPH_DAEMON" -> "demo", - "DEBUG" -> "verbose" + private def getMinioContainer(): Container = { + val envVars = Map ( + "MINIO_ACCESS_KEY" -> ACCESS_KEY, + "MINIO_SECRET_KEY" -> SECRET_KEY ).map( envV => new EnvVarBuilder() .withName(envV._1) @@ -59,13 +58,14 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => ).asJava new ContainerBuilder() - .withImage("ceph/daemon:v4.0.3-stable-4.0-nautilus-centos-7-x86_64") + .withImage("minio/minio:latest") .withImagePullPolicy("Always") .withName(cName) + .withArgs("server", "/data") .withPorts(new ContainerPortBuilder() .withName(svcName) .withProtocol("TCP") - .withContainerPort(8000) + .withContainerPort(9000) .build() ) .withResources(new ResourceRequirementsBuilder() @@ -77,10 +77,9 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .build() } - // Based on https://github.com/ceph/cn - private def setupCephStorage(): Unit = { - val labels = Map("app" -> "ceph", "daemon" -> "nano").asJava - val cephService = new ServiceBuilder() + private def setupMinioStorage(): Unit = { + val labels = Map("app" -> "minio").asJava + val minioService = new ServiceBuilder() .withNewMetadata() .withName(svcName) .withLabels(labels) @@ -88,9 +87,9 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .withNewSpec() .withPorts(new ServicePortBuilder() .withName("https") - .withPort(8000) + .withPort(9000) .withProtocol("TCP") - .withTargetPort(new IntOrString(8000)) + .withTargetPort(new IntOrString(9000)) .build() ) .withType("NodePort") @@ -98,7 +97,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .endSpec() .build() - val cephStatefulSet = new StatefulSetBuilder() + val minioStatefulSet = new StatefulSetBuilder() .withNewMetadata() .withName(cName) .withLabels(labels) @@ -106,7 +105,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .withNewSpec() .withReplicas(1) .withNewSelector() - .withMatchLabels(Map("app" -> "ceph").asJava) + .withMatchLabels(Map("app" -> "minio").asJava) .endSelector() .withServiceName(cName) .withNewTemplate() @@ -115,7 +114,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => .withLabels(labels) .endMetadata() .withNewSpec() - .withContainers(getCephContainer()) + .withContainers(getMinioContainer()) .endSpec() .endTemplate() .endSpec() @@ -124,16 +123,16 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => kubernetesTestComponents .kubernetesClient .services() - .create(cephService) + .create(minioService) kubernetesTestComponents .kubernetesClient .apps() .statefulSets() - .create(cephStatefulSet) + .create(minioStatefulSet) } - private def deleteCephStorage(): Unit = { + private def deleteMinioStorage(): Unit = { kubernetesTestComponents .kubernetesClient .apps() @@ -151,47 +150,30 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => test("Launcher client dependencies", k8sTestTag, MinikubeTag) { val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH) try { - setupCephStorage() - val cephUrlStr = getServiceUrl(svcName) - val cephUrl = new URL(cephUrlStr) - val cephHost = cephUrl.getHost - val cephPort = cephUrl.getPort + setupMinioStorage() + val minioUrlStr = getServiceUrl(svcName) + val minioUrl = new URL(minioUrlStr) + val minioHost = minioUrl.getHost + val minioPort = minioUrl.getPort val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir) - val (accessKey, secretKey) = getCephCredentials() sparkAppConf - .set("spark.hadoop.fs.s3a.access.key", accessKey) - .set("spark.hadoop.fs.s3a.secret.key", secretKey) + .set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) + .set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") - .set("spark.hadoop.fs.s3a.endpoint", s"$cephHost:$cephPort") - .set("spark.kubernetes.file.upload.path", s"s3a://$bucket") + .set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort") + .set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET") .set("spark.files", s"$HOST_PATH/$fileName") .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" + "1.7.4,org.apache.hadoop:hadoop-aws:2.7.6") .set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp") - createS3Bucket(accessKey, secretKey, cephUrlStr) + createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr) runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar, appArgs = Array(fileName), timeout = Option(DEPS_TIMEOUT)) } finally { // make sure this always runs - deleteCephStorage() - } - } - - // There isn't a cleaner way to get the credentials - // when ceph-nano runs on k8s - private def getCephCredentials(): (String, String) = { - Eventually.eventually(TIMEOUT, INTERVAL) { - val cephPod = kubernetesTestComponents - .kubernetesClient - .pods() - .withName(s"$cName-0") - .get() - implicit val podName: String = cephPod.getMetadata.getName - implicit val components: KubernetesTestComponents = kubernetesTestComponents - val contents = Utils.executeCommand("cat", "/nano_user_details") - (extractS3Key(contents, "access_key"), extractS3Key(contents, "secret_key")) + deleteMinioStorage() } } @@ -211,10 +193,10 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => val credentials = new BasicAWSCredentials(accessKey, secretKey) val s3client = new AmazonS3Client(credentials) s3client.setEndpoint(endPoint) - s3client.createBucket(bucket) + s3client.createBucket(BUCKET) } catch { case e: Exception => - throw new SparkException(s"Failed to create bucket $bucket.", e) + throw new SparkException(s"Failed to create bucket $BUCKET.", e) } } }