[SPARK-27362][K8S] Resource Scheduling support for k8s

## What changes were proposed in this pull request?

Add ability to map the spark resource configs spark.{executor/driver}.resource.{resourceName} to kubernetes Container builder so that we request resources (gpu,s/fpgas/etc) from kubernetes.
Note that the spark configs will overwrite any resource configs users put into a pod template.
I added a generic vendor config which is only used by kubernetes right now.  I intentionally didn't put it into the kubernetes config namespace just to avoid adding more config prefixes.

I will add more documentation for this under jira SPARK-27492. I think it will be easier to do all at once to get cohesive story.

## How was this patch tested?

Unit tests and manually testing on k8s cluster.

Closes #24703 from tgravescs/SPARK-27362.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
Thomas Graves 2019-05-31 15:26:14 -05:00 committed by Thomas Graves
parent 2e84181ec3
commit 1277f8fa92
10 changed files with 142 additions and 7 deletions

View file

@ -37,6 +37,7 @@ package object config {
private[spark] val SPARK_RESOURCE_COUNT_SUFFIX = ".count" private[spark] val SPARK_RESOURCE_COUNT_SUFFIX = ".count"
private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = ".discoveryScript" private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = ".discoveryScript"
private[spark] val SPARK_RESOURCE_VENDOR_SUFFIX = ".vendor"
private[spark] val DRIVER_RESOURCES_FILE = private[spark] val DRIVER_RESOURCES_FILE =
ConfigBuilder("spark.driver.resourcesFile") ConfigBuilder("spark.driver.resourcesFile")

View file

@ -206,6 +206,16 @@ of the most common options to set are:
name and an array of addresses. name and an array of addresses.
</td> </td>
</tr> </tr>
<tr>
<td><code>spark.driver.resource.{resourceName}.vendor</code></td>
<td>None</td>
<td>
Vendor of the resources to use for the driver. This option is currently
only supported on Kubernetes and is actually both the vendor and domain following
the Kubernetes device plugin naming convention. (e.g. For GPUs on Kubernetes
this config would be set to nvidia.com or amd.com)
</td>
</tr>
<tr> <tr>
<td><code>spark.executor.memory</code></td> <td><code>spark.executor.memory</code></td>
<td>1g</td> <td>1g</td>
@ -259,6 +269,16 @@ of the most common options to set are:
name and an array of addresses. name and an array of addresses.
</td> </td>
</tr> </tr>
<tr>
<td><code>spark.executor.resource.{resourceName}.vendor</code></td>
<td>None</td>
<td>
Vendor of the resources to use for the executors. This option is currently
only supported on Kubernetes and is actually both the vendor and domain following
the Kubernetes device plugin naming convention. (e.g. For GPUs on Kubernetes
this config would be set to nvidia.com or amd.com)
</td>
</tr>
<tr> <tr>
<td><code>spark.extraListeners</code></td> <td><code>spark.extraListeners</code></td>
<td>(none)</td> <td>(none)</td>

View file

@ -1246,6 +1246,7 @@ The following affect the driver and executor containers. All other containers in
The cpu limits are set by <code>spark.kubernetes.{driver,executor}.limit.cores</code>. The cpu is set by The cpu limits are set by <code>spark.kubernetes.{driver,executor}.limit.cores</code>. The cpu is set by
<code>spark.{driver,executor}.cores</code>. The memory request and limit are set by summing the values of <code>spark.{driver,executor}.cores</code>. The memory request and limit are set by summing the values of
<code>spark.{driver,executor}.memory</code> and <code>spark.{driver,executor}.memoryOverhead</code>. <code>spark.{driver,executor}.memory</code> and <code>spark.{driver,executor}.memoryOverhead</code>.
Other resource limits are set by <code>spark.{driver,executor}.resources.{resourceName}.*</code> configs.
</td> </td>
</tr> </tr>
<tr> <tr>

View file

@ -199,4 +199,13 @@ private[spark] object KubernetesConf {
.replaceAll("[^a-z0-9\\-]", "") .replaceAll("[^a-z0-9\\-]", "")
.replaceAll("-+", "-") .replaceAll("-+", "-")
} }
/**
* Build a resources name based on the vendor device plugin naming
* convention of: vendor-domain/resource. For example, an NVIDIA GPU is
* advertised as nvidia.com/gpu.
*/
def buildKubernetesResourceName(vendorDomain: String, resourceName: String): String = {
s"${vendorDomain}/${resourceName}"
}
} }

View file

@ -23,7 +23,7 @@ import java.util.UUID
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder} import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder, Quantity, QuantityBuilder}
import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
@ -32,6 +32,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{SPARK_RESOURCE_COUNT_SUFFIX, SPARK_RESOURCE_VENDOR_SUFFIX}
import org.apache.spark.launcher.SparkLauncher import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{Clock, SystemClock, Utils} import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.Utils.getHadoopFileSystem import org.apache.spark.util.Utils.getHadoopFileSystem
@ -216,6 +217,32 @@ private[spark] object KubernetesUtils extends Logging {
Hex.encodeHexString(random) + time Hex.encodeHexString(random) + time
} }
/**
* This function builds the Quantity objects for each resource in the Spark resource
* configs based on the component name(spark.driver.resource or spark.executor.resource).
* It assumes we can use the Kubernetes device plugin format: vendor-domain/resource.
* It returns a set with a tuple of vendor-domain/resource and Quantity for each resource.
*/
def buildResourcesQuantities(
componentName: String,
sparkConf: SparkConf): Map[String, Quantity] = {
val allResources = sparkConf.getAllWithPrefix(componentName)
val vendors = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_VENDOR_SUFFIX).toMap
val amounts = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_COUNT_SUFFIX).toMap
val uniqueResources = SparkConf.getBaseOfConfigs(allResources)
uniqueResources.map { rName =>
val vendorDomain = vendors.get(rName).getOrElse(throw new SparkException("Resource: " +
s"$rName was requested, but vendor was not specified."))
val amount = amounts.get(rName).getOrElse(throw new SparkException(s"Resource: $rName " +
"was requested, but count was not specified."))
val quantity = new QuantityBuilder(false)
.withAmount(amount)
.build()
(KubernetesConf.buildKubernetesResourceName(vendorDomain, rName), quantity)
}.toMap
}
/** /**
* Upload files and modify their uris * Upload files and modify their uris
*/ */

View file

@ -88,6 +88,9 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
("cpu", new QuantityBuilder(false).withAmount(limitCores).build()) ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
} }
val driverResourceQuantities =
KubernetesUtils.buildResourcesQuantities(SPARK_DRIVER_RESOURCE_PREFIX, conf.sparkConf)
val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT) val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
val driverBlockManagerPort = conf.sparkConf.getInt( val driverBlockManagerPort = conf.sparkConf.getInt(
DRIVER_BLOCK_MANAGER_PORT.key, DRIVER_BLOCK_MANAGER_PORT.key,
@ -129,6 +132,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.addToLimits(maybeCpuLimitQuantity.toMap.asJava) .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
.addToRequests("memory", driverMemoryQuantity) .addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryQuantity) .addToLimits("memory", driverMemoryQuantity)
.addToLimits(driverResourceQuantities.asJava)
.endResources() .endResources()
.build() .build()

View file

@ -95,6 +95,10 @@ private[spark] class BasicExecutorFeatureStep(
.withAmount(executorCoresRequest) .withAmount(executorCoresRequest)
.build() .build()
val executorResourceQuantities =
KubernetesUtils.buildResourcesQuantities(SPARK_EXECUTOR_RESOURCE_PREFIX,
kubernetesConf.sparkConf)
val executorEnv: Seq[EnvVar] = { val executorEnv: Seq[EnvVar] = {
(Seq( (Seq(
(ENV_DRIVER_URL, driverUrl), (ENV_DRIVER_URL, driverUrl),
@ -168,11 +172,12 @@ private[spark] class BasicExecutorFeatureStep(
.addToRequests("memory", executorMemoryQuantity) .addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryQuantity) .addToLimits("memory", executorMemoryQuantity)
.addToRequests("cpu", executorCpuQuantity) .addToRequests("cpu", executorCpuQuantity)
.addToLimits(executorResourceQuantities.asJava)
.endResources() .endResources()
.addNewEnv() .addNewEnv()
.withName(ENV_SPARK_USER) .withName(ENV_SPARK_USER)
.withValue(Utils.getCurrentUserName()) .withValue(Utils.getCurrentUserName())
.endEnv() .endEnv()
.addAllToEnv(executorEnv.asJava) .addAllToEnv(executorEnv.asJava)
.withPorts(requiredPorts.asJava) .withPorts(requiredPorts.asJava)
.addToArgs("executor") .addToArgs("executor")

View file

@ -24,10 +24,10 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation
import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.UI._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
class BasicDriverFeatureStepSuite extends SparkFunSuite { class BasicDriverFeatureStepSuite extends SparkFunSuite {
@ -45,6 +45,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
} }
test("Check the pod respects all configurations from the user.") { test("Check the pod respects all configurations from the user.") {
val resources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com")))
val sparkConf = new SparkConf() val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(DRIVER_CORES, 2) .set(DRIVER_CORES, 2)
@ -53,6 +54,14 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
.set(DRIVER_MEMORY_OVERHEAD, 200L) .set(DRIVER_MEMORY_OVERHEAD, 200L)
.set(CONTAINER_IMAGE, "spark-driver:latest") .set(CONTAINER_IMAGE, "spark-driver:latest")
.set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS) .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
resources.foreach { case (_, testRInfo) =>
sparkConf.set(
s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
testRInfo.count)
sparkConf.set(
s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",
testRInfo.vendor)
}
val kubernetesConf = KubernetesTestConf.createDriverConf( val kubernetesConf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf, sparkConf = sparkConf,
labels = DRIVER_LABELS, labels = DRIVER_LABELS,
@ -100,6 +109,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
val limits = resourceRequirements.getLimits.asScala val limits = resourceRequirements.getLimits.asScala
assert(limits("memory").getAmount === "456Mi") assert(limits("memory").getAmount === "456Mi")
assert(limits("cpu").getAmount === "4") assert(limits("cpu").getAmount === "4")
resources.foreach { case (k8sName, testRInfo) =>
assert(limits(k8sName).getAmount === testRInfo.count)
}
val driverPodMetadata = configuredPod.pod.getMetadata val driverPodMetadata = configuredPod.pod.getMetadata
assert(driverPodMetadata.getName === "spark-driver-pod") assert(driverPodMetadata.getName === "spark-driver-pod")

View file

@ -26,11 +26,13 @@ import com.google.common.net.InternetDomainName
import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.api.model._
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation
import org.apache.spark.internal.config import org.apache.spark.internal.config
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._ import org.apache.spark.internal.config.Python._
import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@ -90,6 +92,58 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
environment = environment) environment = environment)
} }
test("test spark resource missing vendor") {
val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com")))
// test missing vendor
gpuResources.foreach { case (_, testRInfo) =>
baseConf.set(
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
testRInfo.count)
}
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
val error = intercept[SparkException] {
val executor = step.configurePod(SparkPod.initialPod())
}.getMessage()
assert(error.contains("Resource: gpu was requested, but vendor was not specified"))
}
test("test spark resource missing amount") {
val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com")))
// test missing count
gpuResources.foreach { case (_, testRInfo) =>
baseConf.set(
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",
testRInfo.vendor)
}
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
val error = intercept[SparkException] {
val executor = step.configurePod(SparkPod.initialPod())
}.getMessage()
assert(error.contains("Resource: gpu was requested, but count was not specified"))
}
test("basic executor pod with resources") {
val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", "2", "nvidia.com")),
("foo.com/fpga" -> TestResourceInformation("fpga", "f1", "foo.com")))
gpuResources.foreach { case (_, testRInfo) =>
baseConf.set(
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
testRInfo.count)
baseConf.set(
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",
testRInfo.vendor)
}
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
val executor = step.configurePod(SparkPod.initialPod())
assert(executor.container.getResources.getLimits.size() === 3)
assert(executor.container.getResources
.getLimits.get("memory").getAmount === "1408Mi")
gpuResources.foreach { case (k8sName, testRInfo) =>
assert(executor.container.getResources.getLimits.get(k8sName).getAmount === testRInfo.count)
}
}
test("basic executor pod has reasonable defaults") { test("basic executor pod has reasonable defaults") {
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())

View file

@ -66,4 +66,6 @@ object KubernetesFeaturesTestUtils {
val desired = implicitly[ClassTag[T]].runtimeClass val desired = implicitly[ClassTag[T]].runtimeClass
list.filter(_.getClass() == desired).map(_.asInstanceOf[T]) list.filter(_.getClass() == desired).map(_.asInstanceOf[T])
} }
case class TestResourceInformation(rName: String, count: String, vendor: String)
} }