[SPARK-27760][CORE] Spark resources - change user resource config from .count to .amount

## What changes were proposed in this pull request?

Change the resource config spark.{executor/driver}.resource.{resourceName}.count to .amount to allow future usage of containing both a count and a unit.  Right now we only support counts - # of gpus for instance, but in the future we may want to support units for things like memory - 25G. I think making the user only have to specify a single config .amount is better then making them specify 2 separate configs of a .count and then a .unit.  Change it now since its a user facing config.

Amount also matches how the spark on yarn configs are setup.

## How was this patch tested?

Unit tests and manually verified on yarn and local cluster mode

Closes #24810 from tgravescs/SPARK-27760-amount.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
Thomas Graves 2019-06-06 14:16:05 -05:00 committed by Thomas Graves
parent eadb53824d
commit d30284b5a5
19 changed files with 93 additions and 69 deletions

View file

@ -512,8 +512,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
*/
private[spark] def getTaskResourceRequirements(): Map[String, Int] = {
getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX)
.withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_SUFFIX)}
.map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_SUFFIX.length), v.toInt)}.toMap
.withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_AMOUNT_SUFFIX)}
.map { case (k, v) => (k.dropRight(SPARK_RESOURCE_AMOUNT_SUFFIX.length), v.toInt)}.toMap
}
/**

View file

@ -391,7 +391,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
// verify the resources we discovered are what the user requested
val driverReqResourcesAndCounts =
SparkConf.getConfigsWithSuffix(allDriverResourceConfs, SPARK_RESOURCE_COUNT_SUFFIX).toMap
SparkConf.getConfigsWithSuffix(allDriverResourceConfs, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
ResourceDiscoverer.checkActualResourcesMeetRequirements(driverReqResourcesAndCounts, _resources)
logInfo("===============================================================================")
@ -2725,7 +2725,7 @@ object SparkContext extends Logging {
// executor and resources required by each task.
val taskResourcesAndCount = sc.conf.getTaskResourceRequirements()
val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix(
SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_COUNT_SUFFIX).toMap
SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
var numSlots = execCores / taskCores
var limitingResourceName = "CPU"
taskResourcesAndCount.foreach { case (rName, taskCount) =>
@ -2733,17 +2733,17 @@ object SparkContext extends Logging {
val execCount = executorResourcesAndCounts.getOrElse(rName,
throw new SparkException(
s"The executor resource config: " +
s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_SUFFIX} " +
s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} " +
"needs to be specified since a task requirement config: " +
s"${SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_SUFFIX} was specified")
s"${SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} was specified")
)
// Make sure the executor resources are large enough to launch at least one task.
if (execCount.toLong < taskCount.toLong) {
throw new SparkException(
s"The executor resource config: " +
s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_SUFFIX} " +
s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} " +
s"= $execCount has to be >= the task config: " +
s"${SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_SUFFIX} = $taskCount")
s"${SPARK_TASK_RESOURCE_PREFIX + rName + SPARK_RESOURCE_AMOUNT_SUFFIX} = $taskCount")
}
// Compare and update the max slots each executor can provide.
val resourceNumSlots = execCount.toInt / taskCount

View file

@ -319,7 +319,7 @@ private[spark] object TestUtils {
conf: SparkConf,
resourceName: String,
resourceCount: Int): SparkConf = {
val key = s"${SPARK_TASK_RESOURCE_PREFIX}${resourceName}${SPARK_RESOURCE_COUNT_SUFFIX}"
val key = s"${SPARK_TASK_RESOURCE_PREFIX}${resourceName}${SPARK_RESOURCE_AMOUNT_SUFFIX}"
conf.set(key, resourceCount.toString)
}
}

View file

@ -106,7 +106,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}
val execReqResourcesAndCounts =
env.conf.getAllWithPrefixAndSuffix(SPARK_EXECUTOR_RESOURCE_PREFIX,
SPARK_RESOURCE_COUNT_SUFFIX).toMap
SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
ResourceDiscoverer.checkActualResourcesMeetRequirements(execReqResourcesAndCounts,
actualExecResources)

View file

@ -35,7 +35,7 @@ package object config {
private[spark] val SPARK_EXECUTOR_RESOURCE_PREFIX = "spark.executor.resource."
private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource."
private[spark] val SPARK_RESOURCE_COUNT_SUFFIX = ".count"
private[spark] val SPARK_RESOURCE_AMOUNT_SUFFIX = ".amount"
private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = ".discoveryScript"
private[spark] val SPARK_RESOURCE_VENDOR_SUFFIX = ".vendor"

View file

@ -224,7 +224,7 @@ class ResourceDiscovererSuite extends SparkFunSuite
test("gpu's specified but not discovery script") {
val sparkconf = new SparkConf
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_COUNT_SUFFIX, "2")
SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
val error = intercept[SparkException] {
ResourceDiscoverer.discoverResourcesInformation(sparkconf, SPARK_EXECUTOR_RESOURCE_PREFIX)

View file

@ -450,16 +450,16 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
test("get task resource requirement from config") {
val conf = new SparkConf()
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "1")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
var taskResourceRequirement = conf.getTaskResourceRequirements()
assert(taskResourceRequirement.size == 2)
assert(taskResourceRequirement(GPU) == 2)
assert(taskResourceRequirement(FPGA) == 1)
conf.remove(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX)
conf.remove(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX)
// Ignore invalid prefix
conf.set("spark.invalid.prefix" + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "1")
conf.set("spark.invalid.prefix" + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
taskResourceRequirement = conf.getTaskResourceRequirements()
assert(taskResourceRequirement.size == 1)
assert(taskResourceRequirement.get(FPGA).isEmpty)

View file

@ -747,7 +747,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
val conf = new SparkConf()
.set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_COUNT_SUFFIX, "1")
SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
.set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
.setMaster("local-cluster[1, 1, 1024]")
@ -784,7 +784,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
val conf = new SparkConf()
.set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_COUNT_SUFFIX, "1")
SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
.set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
.set(DRIVER_RESOURCES_FILE, resourcesFile)
@ -806,7 +806,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("Test parsing resources task configs with missing executor config") {
val conf = new SparkConf()
.set(SPARK_TASK_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_COUNT_SUFFIX, "1")
SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
.setMaster("local-cluster[1, 1, 1024]")
.setAppName("test-cluster")
@ -814,17 +814,17 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
sc = new SparkContext(conf)
}.getMessage()
assert(error.contains("The executor resource config: spark.executor.resource.gpu.count " +
"needs to be specified since a task requirement config: spark.task.resource.gpu.count " +
assert(error.contains("The executor resource config: spark.executor.resource.gpu.amount " +
"needs to be specified since a task requirement config: spark.task.resource.gpu.amount " +
"was specified"))
}
test("Test parsing resources executor config < task requirements") {
val conf = new SparkConf()
.set(SPARK_TASK_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_COUNT_SUFFIX, "2")
SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
SPARK_RESOURCE_COUNT_SUFFIX, "1")
SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
.setMaster("local-cluster[1, 1, 1024]")
.setAppName("test-cluster")
@ -833,14 +833,14 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}.getMessage()
assert(error.contains("The executor resource config: " +
"spark.executor.resource.gpu.count = 1 has to be >= the task config: " +
"spark.task.resource.gpu.count = 2"))
"spark.executor.resource.gpu.amount = 1 has to be >= the task config: " +
"spark.task.resource.gpu.amount = 2"))
}
test("Parse resources executor config not the same multiple numbers of the task requirements") {
val conf = new SparkConf()
.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "4")
.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "4")
.setMaster("local-cluster[1, 1, 1024]")
.setAppName("test-cluster")
@ -873,7 +873,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
val discoveryScript = resourceFile.getPath()
val conf = new SparkConf()
.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_COUNT_SUFFIX}", "3")
.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3")
.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX}",
discoveryScript)
.setMaster("local-cluster[3, 3, 1024]")

View file

@ -57,7 +57,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
test("parsing no resources") {
val conf = new SparkConf
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
@ -79,8 +79,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
test("parsing one resources") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
@ -103,10 +103,10 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
test("parsing multiple resources") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3")
conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3")
conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
@ -136,8 +136,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
test("error checking parsing resources and executor and task configs") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "2")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
@ -178,8 +178,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
test("executor resource found less than required") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "4")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "1")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "4")
conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
@ -204,8 +204,8 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
test("use discoverer") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3")
conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_COUNT_SUFFIX, "3")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3")
conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "3")
assume(!(Utils.isWindows))
withTempDir { dir =>
val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga")

View file

@ -187,7 +187,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
val conf = new SparkConf()
.set(EXECUTOR_CORES, 3)
.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_COUNT_SUFFIX, "3")
.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, "3")
.set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test
.setMaster(
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")

View file

@ -1251,9 +1251,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val executorCpus = 4
val taskScheduler = setupScheduler(numCores = executorCpus,
config.CPUS_PER_TASK.key -> taskCpus.toString,
s"${config.SPARK_TASK_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_COUNT_SUFFIX}" ->
s"${config.SPARK_TASK_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_AMOUNT_SUFFIX}" ->
taskGpus.toString,
s"${config.SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_COUNT_SUFFIX}" ->
s"${config.SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_AMOUNT_SUFFIX}" ->
executorGpus.toString,
config.EXECUTOR_CORES.key -> executorCpus.toString)
val taskSet = FakeTask.createTaskSet(3)

View file

@ -194,10 +194,10 @@ of the most common options to set are:
</td>
</tr>
<tr>
<td><code>spark.driver.resource.{resourceName}.count</code></td>
<td><code>spark.driver.resource.{resourceName}.amount</code></td>
<td>0</td>
<td>
The number of a particular resource type to use on the driver.
Amount of a particular resource type to use on the driver.
If this is used, you must also specify the
<code>spark.driver.resource.{resourceName}.discoveryScript</code>
for the driver to find the resource on startup.
@ -264,10 +264,10 @@ of the most common options to set are:
</td>
</tr>
<tr>
<td><code>spark.executor.resource.{resourceName}.count</code></td>
<td><code>spark.executor.resource.{resourceName}.amount</code></td>
<td>0</td>
<td>
The number of a particular resource type to use per executor process.
Amount of a particular resource type to use per executor process.
If this is used, you must also specify the
<code>spark.executor.resource.{resourceName}.discoveryScript</code>
for the executor to find the resource on startup.
@ -1888,11 +1888,11 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td><code>spark.task.resource.{resourceName}.count</code></td>
<td><code>spark.task.resource.{resourceName}.amount</code></td>
<td>1</td>
<td>
Number of a particular resource type to allocate for each task. If this is specified
you must also provide the executor config <code>spark.executor.resource.{resourceName}.count</code>
Amount of a particular resource type to allocate for each task. If this is specified
you must also provide the executor config <code>spark.executor.resource.{resourceName}.amount</code>
and any corresponding discovery configs so that your executors are created with that resource type.
</td>
</tr>

View file

@ -32,7 +32,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{SPARK_RESOURCE_COUNT_SUFFIX, SPARK_RESOURCE_VENDOR_SUFFIX}
import org.apache.spark.internal.config.{SPARK_RESOURCE_AMOUNT_SUFFIX, SPARK_RESOURCE_VENDOR_SUFFIX}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.Utils.getHadoopFileSystem
@ -228,7 +228,7 @@ private[spark] object KubernetesUtils extends Logging {
sparkConf: SparkConf): Map[String, Quantity] = {
val allResources = sparkConf.getAllWithPrefix(componentName)
val vendors = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_VENDOR_SUFFIX).toMap
val amounts = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_COUNT_SUFFIX).toMap
val amounts = SparkConf.getConfigsWithSuffix(allResources, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
val uniqueResources = SparkConf.getBaseOfConfigs(allResources)
uniqueResources.map { rName =>

View file

@ -56,7 +56,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
.set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
resources.foreach { case (_, testRInfo) =>
sparkConf.set(
s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
testRInfo.count)
sparkConf.set(
s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",

View file

@ -97,7 +97,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
// test missing vendor
gpuResources.foreach { case (_, testRInfo) =>
baseConf.set(
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
testRInfo.count)
}
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
@ -127,7 +127,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
("foo.com/fpga" -> TestResourceInformation("fpga", "f1", "foo.com")))
gpuResources.foreach { case (_, testRInfo) =>
baseConf.set(
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_COUNT_SUFFIX}",
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
testRInfo.count)
baseConf.set(
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",

View file

@ -68,13 +68,13 @@ private object ResourceRequestHelper extends Logging {
(AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
(DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
(EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}",
(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}",
s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
(s"${SPARK_DRIVER_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}",
(s"${SPARK_DRIVER_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}",
s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu{SPARK_RESOURCE_COUNT_SUFFIX}",
(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}",
s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"),
(s"${SPARK_DRIVER_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}",
(s"${SPARK_DRIVER_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}",
s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"))
val errorMessage = new mutable.StringBuilder()

View file

@ -55,7 +55,7 @@ object YarnSparkHadoopUtil {
): Map[String, String] = {
Map("gpu" -> YARN_GPU_RESOURCE_CONFIG, "fpga" -> YARN_FPGA_RESOURCE_CONFIG).map {
case (rName, yarnName) =>
val resourceCountSparkConf = s"${confPrefix}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}"
val resourceCountSparkConf = s"${confPrefix}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}"
(yarnName -> sparkConf.get(resourceCountSparkConf, "0"))
}.filter { case (_, count) => count.toLong > 0 }
}

View file

@ -390,7 +390,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
}
}
test(s"custom resource request yarn config and spark config fails") {
test("custom driver resource request yarn config and spark config fails") {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", YARN_FPGA_RESOURCE_CONFIG -> "fpga")
ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
@ -400,7 +400,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
}
resources.values.foreach { rName =>
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}", "3")
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3")
}
val error = intercept[SparkException] {
@ -408,12 +408,36 @@ class ClientSuite extends SparkFunSuite with Matchers {
}.getMessage()
assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga," +
" please use spark.driver.resource.fpga"))
" please use spark.driver.resource.fpga.amount"))
assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu," +
" please use spark.driver.resource.gpu"))
" please use spark.driver.resource.gpu.amount"))
}
test(s"custom resources spark config mapped to yarn config") {
test("custom executor resource request yarn config and spark config fails") {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", YARN_FPGA_RESOURCE_CONFIG -> "fpga")
ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
resources.keys.foreach { yarnName =>
conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
}
resources.values.foreach { rName =>
conf.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3")
}
val error = intercept[SparkException] {
ResourceRequestHelper.validateResources(conf)
}.getMessage()
assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga," +
" please use spark.executor.resource.fpga.amount"))
assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu," +
" please use spark.executor.resource.gpu.amount"))
}
test("custom resources spark config mapped to yarn config") {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val yarnMadeupResource = "yarn.io/madeup"
val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu",
@ -423,7 +447,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
resources.values.foreach { rName =>
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}", "3")
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}", "3")
}
// also just set yarn one that we don't convert
conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5")

View file

@ -189,8 +189,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
val sparkResources =
Map(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}" -> "3",
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}" -> "2",
Map(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}" -> "3",
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}" -> "2",
s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}" -> "5")
val handler = createAllocator(1, mockAmClient, sparkResources)