[SPARK-27024] Executor interface for cluster managers to support GPU and other resources

## What changes were proposed in this pull request?

Add in GPU and generic resource type allocation to the executors.

Note this is part of a bigger feature for gpu-aware scheduling and is just how the executor find the resources. The general flow :

   - users ask for a certain set of resources, for instance number of gpus - each cluster manager has a specific way to do this.
  -  cluster manager allocates a container or set of resources (standalone mode)
-    When spark launches the executor in that container, the executor either has to be told what resources it has or it has to auto discover them.
  -  Executor has to register with Driver and tell the driver the set of resources it has so the scheduler can use that to schedule tasks that requires a certain amount of each of those resources

In this pr I added configs and arguments to the executor to be able discover resources. The argument to the executor is intended to be used by standalone mode or other cluster managers that don't have isolation so that it can assign specific resources to specific executors in case there are multiple executors on a node. The argument is a file contains JSON Array of ResourceInformation objects.

The discovery script is meant to be used in an isolated environment where the executor only sees the resources it should use.

Note that there will be follow on PRs to add other parts like the scheduler part. See the epic high level jira: https://issues.apache.org/jira/browse/SPARK-24615

## How was this patch tested?

Added unit tests and manually tested.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #24406 from tgravescs/gpu-sched-executor-clean.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
Thomas Graves 2019-05-14 08:41:41 -05:00 committed by Thomas Graves
parent 66f5a42ca5
commit db2e3c4341
14 changed files with 749 additions and 19 deletions

View file

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.io.File
import com.fasterxml.jackson.core.JsonParseException
import org.json4s.{DefaultFormats, MappingException}
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils.executeAndGetOutput
/**
* Discovers resources (GPUs/FPGAs/etc). It currently only supports resources that have
* addresses.
* This class finds resources by running and parsing the output of the user specified script
* from the config spark.{driver/executor}.resource.{resourceType}.discoveryScript.
* The output of the script it runs is expected to be JSON in the format of the
* ResourceInformation class.
*
* For example: {"name": "gpu", "addresses": ["0","1"]}
*/
private[spark] object ResourceDiscoverer extends Logging {
private implicit val formats = DefaultFormats
def findResources(sparkConf: SparkConf, isDriver: Boolean): Map[String, ResourceInformation] = {
val prefix = if (isDriver) {
SPARK_DRIVER_RESOURCE_PREFIX
} else {
SPARK_EXECUTOR_RESOURCE_PREFIX
}
// get unique resource types by grabbing first part config with multiple periods,
// ie resourceType.count, grab resourceType part
val resourceNames = sparkConf.getAllWithPrefix(prefix).map { case (k, _) =>
k.split('.').head
}.toSet
resourceNames.map { rName => {
val rInfo = getResourceInfoForType(sparkConf, prefix, rName)
(rName -> rInfo)
}}.toMap
}
private def getResourceInfoForType(
sparkConf: SparkConf,
prefix: String,
resourceType: String): ResourceInformation = {
val discoveryConf = prefix + resourceType + SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX
val script = sparkConf.getOption(discoveryConf)
val result = if (script.nonEmpty) {
val scriptFile = new File(script.get)
// check that script exists and try to execute
if (scriptFile.exists()) {
try {
val output = executeAndGetOutput(Seq(script.get), new File("."))
val parsedJson = parse(output)
val name = (parsedJson \ "name").extract[String]
val addresses = (parsedJson \ "addresses").extract[Array[String]].toArray
new ResourceInformation(name, addresses)
} catch {
case e @ (_: SparkException | _: MappingException | _: JsonParseException) =>
throw new SparkException(s"Error running the resource discovery script: $scriptFile" +
s" for $resourceType", e)
}
} else {
throw new SparkException(s"Resource script: $scriptFile to discover $resourceType" +
s" doesn't exist!")
}
} else {
throw new SparkException(s"User is expecting to use $resourceType resources but " +
s"didn't specify a script via conf: $discoveryConf, to find them!")
}
result
}
}

View file

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import org.apache.spark.annotation.Evolving
/**
* Class to hold information about a type of Resource. A resource could be a GPU, FPGA, etc.
* The array of addresses are resource specific and its up to the user to interpret the address.
*
* One example is GPUs, where the addresses would be the indices of the GPUs
*
* @param name the name of the resource
* @param addresses an array of strings describing the addresses of the resource
*/
@Evolving
class ResourceInformation(
val name: String,
val addresses: Array[String]) extends Serializable {
override def toString: String = s"[name: ${name}, addresses: ${addresses.mkString(",")}]"
}

View file

@ -17,6 +17,7 @@
package org.apache.spark.executor
import java.io.{BufferedInputStream, FileInputStream}
import java.net.URL
import java.nio.ByteBuffer
import java.util.Locale
@ -26,12 +27,18 @@ import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import com.fasterxml.jackson.databind.exc.MismatchedInputException
import org.json4s.DefaultFormats
import org.json4s.JsonAST.JArray
import org.json4s.MappingException
import org.json4s.jackson.JsonMethods._
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.EXECUTOR_ID
import org.apache.spark.internal.config._
import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@ -45,9 +52,12 @@ private[spark] class CoarseGrainedExecutorBackend(
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
env: SparkEnv,
resourcesFile: Option[String])
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
private implicit val formats = DefaultFormats
private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None
@ -58,11 +68,12 @@ private[spark] class CoarseGrainedExecutorBackend(
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
val resources = parseOrFindResources(resourcesFile)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractAttributes))
extractAttributes, resources))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
@ -72,6 +83,97 @@ private[spark] class CoarseGrainedExecutorBackend(
}(ThreadUtils.sameThread)
}
// Check that the actual resources discovered will satisfy the user specified
// requirements and that they match the configs specified by the user to catch
// mismatches between what the user requested and what resource manager gave or
// what the discovery script found.
private def checkResourcesMeetRequirements(
resourceConfigPrefix: String,
reqResourcesAndCounts: Array[(String, String)],
actualResources: Map[String, ResourceInformation]): Unit = {
reqResourcesAndCounts.foreach { case (rName, reqCount) =>
if (actualResources.contains(rName)) {
val resourceInfo = actualResources(rName)
if (resourceInfo.addresses.size < reqCount.toLong) {
throw new SparkException(s"Resource: $rName with addresses: " +
s"${resourceInfo.addresses.mkString(",")} doesn't meet the " +
s"requirements of needing $reqCount of them")
}
// also make sure the resource count on start matches the
// resource configs specified by user
val userCountConfigName =
resourceConfigPrefix + rName + SPARK_RESOURCE_COUNT_POSTFIX
val userConfigCount = env.conf.getOption(userCountConfigName).
getOrElse(throw new SparkException(s"Resource: $rName not specified " +
s"via config: $userCountConfigName, but required, " +
"please fix your configuration"))
if (userConfigCount.toLong > resourceInfo.addresses.size) {
throw new SparkException(s"Resource: $rName, with addresses: " +
s"${resourceInfo.addresses.mkString(",")} " +
s"is less than what the user requested for count: $userConfigCount, " +
s"via $userCountConfigName")
}
} else {
throw new SparkException(s"Executor resource config missing required task resource: $rName")
}
}
}
// visible for testing
def parseOrFindResources(resourcesFile: Option[String]): Map[String, ResourceInformation] = {
// only parse the resources if a task requires them
val taskResourceConfigs = env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX)
val resourceInfo = if (taskResourceConfigs.nonEmpty) {
val execResources = resourcesFile.map { resourceFileStr => {
val source = new BufferedInputStream(new FileInputStream(resourceFileStr))
val resourceMap = try {
val parsedJson = parse(source).asInstanceOf[JArray].arr
parsedJson.map { json =>
val name = (json \ "name").extract[String]
val addresses = (json \ "addresses").extract[Array[String]]
new ResourceInformation(name, addresses)
}.map(x => (x.name -> x)).toMap
} catch {
case e @ (_: MappingException | _: MismatchedInputException) =>
throw new SparkException(
s"Exception parsing the resources in $resourceFileStr", e)
} finally {
source.close()
}
resourceMap
}}.getOrElse(ResourceDiscoverer.findResources(env.conf, isDriver = false))
if (execResources.isEmpty) {
throw new SparkException("User specified resources per task via: " +
s"$SPARK_TASK_RESOURCE_PREFIX, but can't find any resources available on the executor.")
}
// get just the map of resource name to count
val resourcesAndCounts = taskResourceConfigs.
withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_COUNT_POSTFIX)}.
map { case (k, v) => (k.dropRight(SPARK_RESOURCE_COUNT_POSTFIX.size), v)}
checkResourcesMeetRequirements(SPARK_EXECUTOR_RESOURCE_PREFIX, resourcesAndCounts,
execResources)
logInfo("===============================================================================")
logInfo(s"Executor $executorId Resources:")
execResources.foreach { case (k, v) => logInfo(s"$k -> $v") }
logInfo("===============================================================================")
execResources
} else {
if (resourcesFile.nonEmpty) {
logWarning(s"A resources file was specified but the application is not configured " +
s"to use any resources, see the configs with prefix: ${SPARK_TASK_RESOURCE_PREFIX}")
}
Map.empty[String, ResourceInformation]
}
resourceInfo
}
def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
@ -189,13 +291,14 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: mutable.ListBuffer[URL])
userClassPath: mutable.ListBuffer[URL],
resourcesFile: Option[String])
def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, Arguments, SparkEnv) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.hostname, arguments.cores, arguments.userClassPath, env)
arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourcesFile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)
@ -257,6 +360,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
var executorId: String = null
var hostname: String = null
var cores: Int = 0
var resourcesFile: Option[String] = None
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
@ -276,6 +380,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--resourcesFile") :: value :: tail =>
resourcesFile = Some(value)
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
@ -301,7 +408,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl,
userClassPath)
userClassPath, resourcesFile)
}
private def printUsageAndExit(classNameForEntry: String): Unit = {
@ -315,6 +422,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
| --executor-id <executorId>
| --hostname <hostname>
| --cores <cores>
| --resourcesFile <fileWithJSONResourceInformation>
| --app-id <appid>
| --worker-url <workerUrl>
| --user-class-path <url>

View file

@ -30,6 +30,13 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.MAX_
package object config {
private[spark] val SPARK_DRIVER_RESOURCE_PREFIX = "spark.driver.resource."
private[spark] val SPARK_EXECUTOR_RESOURCE_PREFIX = "spark.executor.resource."
private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource."
private[spark] val SPARK_RESOURCE_COUNT_POSTFIX = ".count"
private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX = ".discoveryScript"
private[spark] val DRIVER_CLASS_PATH =
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional

View file

@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster
import java.nio.ByteBuffer
import org.apache.spark.ResourceInformation
import org.apache.spark.TaskState.TaskState
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.ExecutorLossReason
@ -64,7 +65,8 @@ private[spark] object CoarseGrainedClusterMessages {
hostname: String,
cores: Int,
logUrls: Map[String, String],
attributes: Map[String, String])
attributes: Map[String, String],
resources: Map[String, ResourceInformation])
extends CoarseGrainedClusterMessage
case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,

View file

@ -185,7 +185,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes) =>
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
attributes, resources) =>
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)

View file

@ -174,9 +174,11 @@ class HeartbeatReceiverSuite
val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty))
RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty,
Map.empty))
fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty))
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty,
Map.empty))
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)

View file

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.{Files => JavaFiles}
import java.nio.file.attribute.PosixFilePermission._
import java.util.EnumSet
import com.google.common.io.Files
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
class ResourceDiscovererSuite extends SparkFunSuite
with LocalSparkContext {
def mockDiscoveryScript(file: File, result: String): String = {
Files.write(s"echo $result", file, StandardCharsets.UTF_8)
JavaFiles.setPosixFilePermissions(file.toPath(),
EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
file.getPath()
}
test("Resource discoverer no resources") {
val sparkconf = new SparkConf
val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false)
assert(resources.size === 0)
assert(resources.get("gpu").isEmpty,
"Should have a gpus entry that is empty")
}
test("Resource discoverer multiple gpus") {
val sparkconf = new SparkConf
assume(!(Utils.isWindows))
withTempDir { dir =>
val gpuFile = new File(dir, "gpuDiscoverScript")
val scriptPath = mockDiscoveryScript(gpuFile,
"""'{"name": "gpu","addresses":["0", "1"]}'""")
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, scriptPath)
val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false)
val gpuValue = resources.get("gpu")
assert(gpuValue.nonEmpty, "Should have a gpu entry")
assert(gpuValue.get.name == "gpu", "name should be gpu")
assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries")
}
}
test("Resource discoverer no addresses errors") {
val sparkconf = new SparkConf
assume(!(Utils.isWindows))
withTempDir { dir =>
val gpuFile = new File(dir, "gpuDiscoverScript")
val scriptPath = mockDiscoveryScript(gpuFile,
"""'{"name": "gpu"}'""")
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, scriptPath)
val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false)
val gpuValue = resources.get("gpu")
assert(gpuValue.nonEmpty, "Should have a gpu entry")
assert(gpuValue.get.name == "gpu", "name should be gpu")
assert(gpuValue.get.addresses.size == 0, "Should have 0 indexes")
}
}
test("Resource discoverer multiple resource types") {
val sparkconf = new SparkConf
assume(!(Utils.isWindows))
withTempDir { dir =>
val gpuFile = new File(dir, "gpuDiscoverScript")
val gpuDiscovery = mockDiscoveryScript(gpuFile,
"""'{"name": "gpu", "addresses": ["0", "1"]}'""")
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery)
val fpgaFile = new File(dir, "fpgaDiscoverScript")
val fpgaDiscovery = mockDiscoveryScript(fpgaFile,
"""'{"name": "fpga", "addresses": ["f1", "f2", "f3"]}'""")
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" +
SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, fpgaDiscovery)
val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = false)
assert(resources.size === 2)
val gpuValue = resources.get("gpu")
assert(gpuValue.nonEmpty, "Should have a gpu entry")
assert(gpuValue.get.name == "gpu", "name should be gpu")
assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries")
val fpgaValue = resources.get("fpga")
assert(fpgaValue.nonEmpty, "Should have a gpu entry")
assert(fpgaValue.get.name == "fpga", "name should be fpga")
assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes")
assert(fpgaValue.get.addresses.deep == Array("f1", "f2", "f3").deep,
"should have f1,f2,f3 entries")
}
}
test("Resource discoverer multiple gpus on driver") {
val sparkconf = new SparkConf
assume(!(Utils.isWindows))
withTempDir { dir =>
val gpuFile = new File(dir, "gpuDiscoverScript")
val gpuDiscovery = mockDiscoveryScript(gpuFile,
"""'{"name": "gpu", "addresses": ["0", "1"]}'""")
sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery)
sparkconf set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, "boguspath")
// make sure it reads from correct config, here it should use driver
val resources = ResourceDiscoverer.findResources(sparkconf, isDriver = true)
val gpuValue = resources.get("gpu")
assert(gpuValue.nonEmpty, "Should have a gpu entry")
assert(gpuValue.get.name == "gpu", "name should be gpu")
assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 0,1 entries")
}
}
test("Resource discoverer script returns invalid format") {
val sparkconf = new SparkConf
assume(!(Utils.isWindows))
withTempDir { dir =>
val gpuFile = new File(dir, "gpuDiscoverScript")
val gpuDiscovery = mockDiscoveryScript(gpuFile,
"""'{"addresses": ["0", "1"]}'""")
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, gpuDiscovery)
val error = intercept[SparkException] {
ResourceDiscoverer.findResources(sparkconf, isDriver = false)
}.getMessage()
assert(error.contains("Error running the resource discovery"))
}
}
test("Resource discoverer script doesn't exist") {
val sparkconf = new SparkConf
withTempDir { dir =>
val file1 = new File(dir, "bogusfilepath")
try {
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, file1.getPath())
val error = intercept[SparkException] {
ResourceDiscoverer.findResources(sparkconf, isDriver = false)
}.getMessage()
assert(error.contains("doesn't exist"))
} finally {
JavaFiles.deleteIfExists(file1.toPath())
}
}
}
test("gpu's specified but not discovery script") {
val sparkconf = new SparkConf
sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" +
SPARK_RESOURCE_COUNT_POSTFIX, "2")
val error = intercept[SparkException] {
ResourceDiscoverer.findResources(sparkconf, isDriver = false)
}.getMessage()
assert(error.contains("User is expecting to use"))
}
}

View file

@ -497,7 +497,8 @@ class StandaloneDynamicAllocationSuite
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty, Map.empty)
val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty, Map.empty,
Map.empty)
// Get "localhost" on a blacklist.
val taskScheduler = mock(classOf[TaskSchedulerImpl])
@ -621,7 +622,8 @@ class StandaloneDynamicAllocationSuite
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty)
val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty,
Map.empty)
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
backend.driverEndpoint.askSync[Boolean](message)
}

View file

@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor
import java.io.{File, PrintWriter}
import java.net.URL
import java.nio.charset.StandardCharsets
import java.nio.file.{Files => JavaFiles}
import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, OWNER_WRITE}
import java.util.EnumSet
import com.google.common.io.Files
import org.json4s.JsonAST.{JArray, JObject, JString}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, render}
import org.mockito.Mockito.when
import org.scalatest.mockito.MockitoSugar
import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils
class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
with LocalSparkContext with MockitoSugar {
private def writeFileWithJson(dir: File, strToWrite: JArray): String = {
val f1 = File.createTempFile("test-resource-parser1", "", dir)
JavaFiles.write(f1.toPath(), compact(render(strToWrite)).getBytes())
f1.getPath()
}
test("parsing no resources") {
val conf = new SparkConf
conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
4, Seq.empty[URL], env, None)
withTempDir { tmpDir =>
val testResourceArgs: JObject = ("" -> "")
val ja = JArray(List(testResourceArgs))
val f1 = writeFileWithJson(tmpDir, ja)
var error = intercept[SparkException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()
assert(error.contains("Exception parsing the resources in"),
s"Calling with no resources didn't error as expected, error: $error")
}
}
test("parsing one resources") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
4, Seq.empty[URL], env, None)
withTempDir { tmpDir =>
val testResourceArgs =
("name" -> "gpu") ~
("addresses" -> Seq("0", "1"))
val ja = JArray(List(testResourceArgs))
val f1 = writeFileWithJson(tmpDir, ja)
val parsedResources = backend.parseOrFindResources(Some(f1))
assert(parsedResources.size === 1)
assert(parsedResources.get("gpu").nonEmpty)
assert(parsedResources.get("gpu").get.name === "gpu")
assert(parsedResources.get("gpu").get.addresses.deep === Array("0", "1").deep)
}
}
test("parsing multiple resources") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3")
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
4, Seq.empty[URL], env, None)
withTempDir { tmpDir =>
val gpuArgs =
("name" -> "gpu") ~
("addresses" -> Seq("0", "1"))
val fpgaArgs =
("name" -> "fpga") ~
("addresses" -> Seq("f1", "f2", "f3"))
val ja = JArray(List(gpuArgs, fpgaArgs))
val f1 = writeFileWithJson(tmpDir, ja)
val parsedResources = backend.parseOrFindResources(Some(f1))
assert(parsedResources.size === 2)
assert(parsedResources.get("gpu").nonEmpty)
assert(parsedResources.get("gpu").get.name === "gpu")
assert(parsedResources.get("gpu").get.addresses.deep === Array("0", "1").deep)
assert(parsedResources.get("fpga").nonEmpty)
assert(parsedResources.get("fpga").get.name === "fpga")
assert(parsedResources.get("fpga").get.addresses.deep === Array("f1", "f2", "f3").deep)
}
}
test("error checking parsing resources and executor and task configs") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
4, Seq.empty[URL], env, None)
// not enough gpu's on the executor
withTempDir { tmpDir =>
val gpuArgs =
("name" -> "gpu") ~
("addresses" -> Seq("0"))
val ja = JArray(List(gpuArgs))
val f1 = writeFileWithJson(tmpDir, ja)
var error = intercept[SparkException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()
assert(error.contains("doesn't meet the requirements of needing"))
}
// missing resource on the executor
withTempDir { tmpDir =>
val gpuArgs =
("name" -> "fpga") ~
("addresses" -> Seq("0"))
val ja = JArray(List(gpuArgs))
val f1 = writeFileWithJson(tmpDir, ja)
var error = intercept[SparkException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()
assert(error.contains("Executor resource config missing required task resource"))
}
}
test("executor resource found less than required") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "4")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "1")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
4, Seq.empty[URL], env, None)
// executor resources < required
withTempDir { tmpDir =>
val gpuArgs =
("name" -> "gpu") ~
("addresses" -> Seq("0", "1"))
val ja = JArray(List(gpuArgs))
val f1 = writeFileWithJson(tmpDir, ja)
var error = intercept[SparkException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()
assert(error.contains("is less than what the user requested for count"))
}
}
test("parsing resources task configs with missing executor config") {
val conf = new SparkConf
conf.set(SPARK_TASK_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_POSTFIX, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
4, Seq.empty[URL], env, None)
withTempDir { tmpDir =>
val gpuArgs =
("name" -> "gpu") ~
("addresses" -> Seq("0", "1"))
val ja = JArray(List(gpuArgs))
val f1 = writeFileWithJson(tmpDir, ja)
var error = intercept[SparkException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()
assert(error.contains("Resource: gpu not specified via config: " +
"spark.executor.resource.gpu.count, but required, please " +
"fix your configuration"))
}
}
test("use discoverer") {
val conf = new SparkConf
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3")
conf.set(SPARK_TASK_RESOURCE_PREFIX + "fpga" + SPARK_RESOURCE_COUNT_POSTFIX, "3")
assume(!(Utils.isWindows))
withTempDir { dir =>
val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga")
Files.write("""echo '{"name": "fpga","addresses":["f1", "f2", "f3"]}'""",
fpgaDiscovery, StandardCharsets.UTF_8)
JavaFiles.setPosixFilePermissions(fpgaDiscovery.toPath(),
EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + "fpga" +
SPARK_RESOURCE_DISCOVERY_SCRIPT_POSTFIX, fpgaDiscovery.getPath())
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
4, Seq.empty[URL], env, None)
val parsedResources = backend.parseOrFindResources(None)
assert(parsedResources.size === 1)
assert(parsedResources.get("fpga").nonEmpty)
assert(parsedResources.get("fpga").get.name === "fpga")
assert(parsedResources.get("fpga").get.addresses.deep === Array("f1", "f2", "f3").deep)
}
}
private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = {
val mockEnv = mock[SparkEnv]
val mockRpcEnv = mock[RpcEnv]
when(mockEnv.conf).thenReturn(conf)
when(mockEnv.serializer).thenReturn(serializer)
when(mockEnv.closureSerializer).thenReturn(serializer)
when(mockEnv.rpcEnv).thenReturn(mockRpcEnv)
SparkEnv.set(mockEnv)
mockEnv
}
}

View file

@ -164,11 +164,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
sc.addSparkListener(listener)
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes))
RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes))
RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes))
RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
assert(executorAddedCount === 3)

View file

@ -221,6 +221,25 @@ of the most common options to set are:
This option is currently supported on YARN and Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.executor.resource.{resourceType}.count</code></td>
<td>0</td>
<td>
The number of a particular resource type to use per executor process.
If this is used, you must also specify the
<code>spark.executor.resource.{resourceType}.discoveryScript</code>
for the executor to find the resource on startup.
</td>
</tr>
<tr>
<td><code>spark.executor.resource.{resourceType}.discoveryScript</code></td>
<td>None</td>
<td>
A script for the executor to run to discover a particular resource type. This should
write to STDOUT a JSON string in the format of the ResourceInformation class. This has a
name and an array of addresses.
</td>
</tr>
<tr>
<td><code>spark.extraListeners</code></td>
<td>(none)</td>
@ -1793,6 +1812,15 @@ Apart from these, the following properties are also available, and may be useful
Number of cores to allocate for each task.
</td>
</tr>
<tr>
<td><code>spark.task.resource.{resourceType}.count</code></td>
<td>1</td>
<td>
Number of a particular resource type to allocate for each task. If this is specified
you must also provide the executor config <code>spark.executor.resource.{resourceType}.count</code>
and any corresponding discovery configs so that your executors are created with that resource type.
</td>
</tr>
<tr>
<td><code>spark.task.maxFailures</code></td>
<td>4</td>

View file

@ -692,7 +692,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val mockEndpointRef = mock[RpcEndpointRef]
val mockAddress = mock[RpcAddress]
val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty,
Map.empty)
Map.empty, Map.empty)
backend.driverEndpoint.askSync[Boolean](message)
}

View file

@ -37,7 +37,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
env: SparkEnv,
resourcesFile: Option[String])
extends CoarseGrainedExecutorBackend(
rpcEnv,
driverUrl,
@ -45,7 +46,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
hostname,
cores,
userClassPath,
env) with Logging {
env,
resourcesFile) with Logging {
private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf)
@ -66,7 +68,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.hostname, arguments.cores, arguments.userClassPath, env)
arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourcesFile)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
this.getClass.getCanonicalName.stripSuffix("$"))