From 1277f8fa92da85d9e39d9146e3099fcb75c71a8f Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 31 May 2019 15:26:14 -0500 Subject: [PATCH] [SPARK-27362][K8S] Resource Scheduling support for k8s ## What changes were proposed in this pull request? Add ability to map the spark resource configs spark.{executor/driver}.resource.{resourceName} to kubernetes Container builder so that we request resources (gpu,s/fpgas/etc) from kubernetes. Note that the spark configs will overwrite any resource configs users put into a pod template. I added a generic vendor config which is only used by kubernetes right now. I intentionally didn't put it into the kubernetes config namespace just to avoid adding more config prefixes. I will add more documentation for this under jira SPARK-27492. I think it will be easier to do all at once to get cohesive story. ## How was this patch tested? Unit tests and manually testing on k8s cluster. Closes #24703 from tgravescs/SPARK-27362. Authored-by: Thomas Graves Signed-off-by: Thomas Graves --- .../spark/internal/config/package.scala | 1 + docs/configuration.md | 20 +++++++ docs/running-on-kubernetes.md | 1 + .../spark/deploy/k8s/KubernetesConf.scala | 9 +++ .../spark/deploy/k8s/KubernetesUtils.scala | 29 +++++++++- .../k8s/features/BasicDriverFeatureStep.scala | 4 ++ .../features/BasicExecutorFeatureStep.scala | 13 +++-- .../BasicDriverFeatureStepSuite.scala | 14 ++++- .../BasicExecutorFeatureStepSuite.scala | 56 ++++++++++++++++++- .../KubernetesFeaturesTestUtils.scala | 2 + 10 files changed, 142 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index a5d36b590d..8ea88878c4 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -37,6 +37,7 @@ package object config { private[spark] val SPARK_RESOURCE_COUNT_SUFFIX = ".count" private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = ".discoveryScript" + private[spark] val SPARK_RESOURCE_VENDOR_SUFFIX = ".vendor" private[spark] val DRIVER_RESOURCES_FILE = ConfigBuilder("spark.driver.resourcesFile") diff --git a/docs/configuration.md b/docs/configuration.md index 216995162d..24e66e1ce1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -206,6 +206,16 @@ of the most common options to set are: name and an array of addresses. + + spark.driver.resource.{resourceName}.vendor + None + + Vendor of the resources to use for the driver. This option is currently + only supported on Kubernetes and is actually both the vendor and domain following + the Kubernetes device plugin naming convention. (e.g. For GPUs on Kubernetes + this config would be set to nvidia.com or amd.com) + + spark.executor.memory 1g @@ -259,6 +269,16 @@ of the most common options to set are: name and an array of addresses. + + spark.executor.resource.{resourceName}.vendor + None + + Vendor of the resources to use for the executors. This option is currently + only supported on Kubernetes and is actually both the vendor and domain following + the Kubernetes device plugin naming convention. (e.g. For GPUs on Kubernetes + this config would be set to nvidia.com or amd.com) + + spark.extraListeners (none) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 8a424b57fe..d4efb52e0f 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1246,6 +1246,7 @@ The following affect the driver and executor containers. All other containers in The cpu limits are set by spark.kubernetes.{driver,executor}.limit.cores. The cpu is set by spark.{driver,executor}.cores. The memory request and limit are set by summing the values of spark.{driver,executor}.memory and spark.{driver,executor}.memoryOverhead. + Other resource limits are set by spark.{driver,executor}.resources.{resourceName}.* configs. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 5e741112fc..a2a46614fb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -199,4 +199,13 @@ private[spark] object KubernetesConf { .replaceAll("[^a-z0-9\\-]", "") .replaceAll("-+", "-") } + + /** + * Build a resources name based on the vendor device plugin naming + * convention of: vendor-domain/resource. For example, an NVIDIA GPU is + * advertised as nvidia.com/gpu. + */ + def buildKubernetesResourceName(vendorDomain: String, resourceName: String): String = { + s"${vendorDomain}/${resourceName}" + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index a5710357fd..522c8f74b9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -23,7 +23,7 @@ import java.util.UUID import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder} +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder, Quantity, QuantityBuilder} import io.fabric8.kubernetes.client.KubernetesClient import org.apache.commons.codec.binary.Hex import org.apache.hadoop.fs.{FileSystem, Path} @@ -32,6 +32,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{SPARK_RESOURCE_COUNT_SUFFIX, SPARK_RESOURCE_VENDOR_SUFFIX} import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.{Clock, SystemClock, Utils} import org.apache.spark.util.Utils.getHadoopFileSystem @@ -216,6 +217,32 @@ private[spark] object KubernetesUtils extends Logging { Hex.encodeHexString(random) + time } + /** + * This function builds the Quantity objects for each resource in the Spark resource + * configs based on the component name(spark.driver.resource or spark.executor.resource). + * It assumes we can use the Kubernetes device plugin format: vendor-domain/resource. + * It returns a set with a tuple of vendor-domain/resource and Quantity for each resource. + */ + def buildResourcesQuantities( + componentName: String, + sparkConf: SparkConf): Map[String, Quantity] = { + val allResources = sparkConf.getAllWithPrefix(componentName) + val vendors = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_VENDOR_SUFFIX).toMap + val amounts = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_COUNT_SUFFIX).toMap + val uniqueResources = SparkConf.getBaseOfConfigs(allResources) + + uniqueResources.map { rName => + val vendorDomain = vendors.get(rName).getOrElse(throw new SparkException("Resource: " + + s"$rName was requested, but vendor was not specified.")) + val amount = amounts.get(rName).getOrElse(throw new SparkException(s"Resource: $rName " + + "was requested, but count was not specified.")) + val quantity = new QuantityBuilder(false) + .withAmount(amount) + .build() + (KubernetesConf.buildKubernetesResourceName(vendorDomain, rName), quantity) + }.toMap + } + /** * Upload files and modify their uris */ diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 92463df0f6..d10f69f8ba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -88,6 +88,9 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) ("cpu", new QuantityBuilder(false).withAmount(limitCores).build()) } + val driverResourceQuantities = + KubernetesUtils.buildResourcesQuantities(SPARK_DRIVER_RESOURCE_PREFIX, conf.sparkConf) + val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT) val driverBlockManagerPort = conf.sparkConf.getInt( DRIVER_BLOCK_MANAGER_PORT.key, @@ -129,6 +132,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .addToLimits(maybeCpuLimitQuantity.toMap.asJava) .addToRequests("memory", driverMemoryQuantity) .addToLimits("memory", driverMemoryQuantity) + .addToLimits(driverResourceQuantities.asJava) .endResources() .build() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index af162839fe..d46a9b883c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -95,6 +95,10 @@ private[spark] class BasicExecutorFeatureStep( .withAmount(executorCoresRequest) .build() + val executorResourceQuantities = + KubernetesUtils.buildResourcesQuantities(SPARK_EXECUTOR_RESOURCE_PREFIX, + kubernetesConf.sparkConf) + val executorEnv: Seq[EnvVar] = { (Seq( (ENV_DRIVER_URL, driverUrl), @@ -168,11 +172,12 @@ private[spark] class BasicExecutorFeatureStep( .addToRequests("memory", executorMemoryQuantity) .addToLimits("memory", executorMemoryQuantity) .addToRequests("cpu", executorCpuQuantity) + .addToLimits(executorResourceQuantities.asJava) .endResources() - .addNewEnv() - .withName(ENV_SPARK_USER) - .withValue(Utils.getCurrentUserName()) - .endEnv() + .addNewEnv() + .withName(ENV_SPARK_USER) + .withValue(Utils.getCurrentUserName()) + .endEnv() .addAllToEnv(executorEnv.asJava) .withPorts(requiredPorts.asJava) .addToArgs("executor") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 92f46c6747..f60c6fbbf9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -24,10 +24,10 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ -import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils class BasicDriverFeatureStepSuite extends SparkFunSuite { @@ -45,6 +45,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { } test("Check the pod respects all configurations from the user.") { + val resources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com"))) val sparkConf = new SparkConf() .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") .set(DRIVER_CORES, 2) @@ -53,6 +54,14 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(DRIVER_MEMORY_OVERHEAD, 200L) .set(CONTAINER_IMAGE, "spark-driver:latest") .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS) + resources.foreach { case (_, testRInfo) => + sparkConf.set( + s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}", + testRInfo.count) + sparkConf.set( + s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}", + testRInfo.vendor) + } val kubernetesConf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf, labels = DRIVER_LABELS, @@ -100,6 +109,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val limits = resourceRequirements.getLimits.asScala assert(limits("memory").getAmount === "456Mi") assert(limits("cpu").getAmount === "4") + resources.foreach { case (k8sName, testRInfo) => + assert(limits(k8sName).getAmount === testRInfo.count) + } val driverPodMetadata = configuredPod.pod.getMetadata assert(driverPodMetadata.getName === "spark-driver-pod") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 93268c6c31..3e892a9127 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -26,11 +26,13 @@ import com.google.common.net.InternetDomainName import io.fabric8.kubernetes.api.model._ import org.scalatest.BeforeAndAfter -import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation import org.apache.spark.internal.config +import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -90,6 +92,58 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { environment = environment) } + test("test spark resource missing vendor") { + val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com"))) + // test missing vendor + gpuResources.foreach { case (_, testRInfo) => + baseConf.set( + s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}", + testRInfo.count) + } + val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) + val error = intercept[SparkException] { + val executor = step.configurePod(SparkPod.initialPod()) + }.getMessage() + assert(error.contains("Resource: gpu was requested, but vendor was not specified")) + } + + test("test spark resource missing amount") { + val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com"))) + // test missing count + gpuResources.foreach { case (_, testRInfo) => + baseConf.set( + s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}", + testRInfo.vendor) + } + val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) + val error = intercept[SparkException] { + val executor = step.configurePod(SparkPod.initialPod()) + }.getMessage() + assert(error.contains("Resource: gpu was requested, but count was not specified")) + } + + test("basic executor pod with resources") { + val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com")), + ("foo.com/fpga" -> TestResourceInformation("fpga", "f1", "foo.com"))) + gpuResources.foreach { case (_, testRInfo) => + baseConf.set( + s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}", + testRInfo.count) + baseConf.set( + s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}", + testRInfo.vendor) + } + val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) + val executor = step.configurePod(SparkPod.initialPod()) + + assert(executor.container.getResources.getLimits.size() === 3) + assert(executor.container.getResources + .getLimits.get("memory").getAmount === "1408Mi") + gpuResources.foreach { case (k8sName, testRInfo) => + assert(executor.container.getResources.getLimits.get(k8sName).getAmount === testRInfo.count) + } + } + test("basic executor pod has reasonable defaults") { val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) val executor = step.configurePod(SparkPod.initialPod()) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala index b0604ea888..e8be3b0b6e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala @@ -66,4 +66,6 @@ object KubernetesFeaturesTestUtils { val desired = implicitly[ClassTag[T]].runtimeClass list.filter(_.getClass() == desired).map(_.asInstanceOf[T]) } + + case class TestResourceInformation(rName: String, count: String, vendor: String) }