[SPARK-29306][CORE] Stage Level Sched: Executors need to track what ResourceProfile they are created with

### What changes were proposed in this pull request?

This is the second PR for the Stage Level Scheduling. This is adding in the necessary executor side changes:
1) executors to know what ResourceProfile they should be using
2) handle parsing the resource profile settings - these are not in the global configs
3) then reporting back to the driver what resource profile it was started with.

This PR adds all the piping for YARN to pass the information all the way to executors, but it just uses the default ResourceProfile (which is the global applicatino level configs).

At a high level these changes include:
1) adding a new --resourceProfileId option to the CoarseGrainedExecutorBackend
2) Add the ResourceProfile settings to new internal confs that gets passed into the Executor
3) Executor changes that use the resource profile id passed in to read the corresponding ResourceProfile confs and then parse those requests and discover resources as necessary
4) Executor registers to Driver with the Resource profile id so that the ExecutorMonitor can track how many executor with each profile are running
5) YARN side changes to show that passing the resource profile id and confs actually works. Just uses the DefaultResourceProfile for now.

I also removed a check from the CoarseGrainedExecutorBackend that used to check to make sure there were task requirements before parsing any custom resource executor requests.  With the resource profiles this becomes much more expensive because we would then have to pass the task requests to each executor and the check was just a short cut and not really needed. It was much cleaner just to remove it.

Note there were some changes to the ResourceProfile, ExecutorResourceRequests, and TaskResourceRequests in this PR as well because I discovered some issues with things not being immutable. That api now look like:

val rpBuilder = new ResourceProfileBuilder()
val ereq = new ExecutorResourceRequests()
val treq = new TaskResourceRequests()

ereq.cores(2).memory("6g").memoryOverhead("2g").pysparkMemory("2g").resource("gpu", 2, "/home/tgraves/getGpus")
treq.cpus(2).resource("gpu", 2)

val resourceProfile = rpBuilder.require(ereq).require(treq).build

This makes is so that ResourceProfile is immutable and Spark can use it directly without worrying about the user changing it.

### Why are the changes needed?

These changes are needed for the executor to report which ResourceProfile they are using so that ultimately the dynamic allocation manager can use that information to know how many with a profile are running and how many more it needs to request.  Its also needed to get the resource profile confs to the executor so that it can run the appropriate discovery script if needed.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Unit tests and manually on YARN.

Closes #26682 from tgravescs/SPARK-29306.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
Thomas Graves 2020-01-17 08:15:25 -06:00 committed by Thomas Graves
parent 64fe192fef
commit 6dbfa2bb9c
29 changed files with 756 additions and 336 deletions

View file

@ -35,6 +35,8 @@ import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.ResourceProfile._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
@ -51,7 +53,8 @@ private[spark] class CoarseGrainedExecutorBackend(
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFileOpt: Option[String])
resourcesFileOpt: Option[String],
resourceProfile: ResourceProfile)
extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
import CoarseGrainedExecutorBackend._
@ -80,7 +83,7 @@ private[spark] class CoarseGrainedExecutorBackend(
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractAttributes, resources))
extractAttributes, resources, resourceProfile.id))
}(ThreadUtils.sameThread).onComplete {
case Success(_) =>
self.send(RegisteredExecutor)
@ -91,24 +94,13 @@ private[spark] class CoarseGrainedExecutorBackend(
// visible for testing
def parseOrFindResources(resourcesFileOpt: Option[String]): Map[String, ResourceInformation] = {
// only parse the resources if a task requires them
val resourceInfo = if (parseResourceRequirements(env.conf, SPARK_TASK_PREFIX).nonEmpty) {
val resources = getOrDiscoverAllResources(env.conf, SPARK_EXECUTOR_PREFIX, resourcesFileOpt)
if (resources.isEmpty) {
throw new SparkException("User specified resources per task via: " +
s"$SPARK_TASK_PREFIX, but can't find any resources available on the executor.")
} else {
logResourceInfo(SPARK_EXECUTOR_PREFIX, resources)
}
resources
} else {
if (resourcesFileOpt.nonEmpty) {
logWarning("A resources file was specified but the application is not configured " +
s"to use any resources, see the configs with prefix: ${SPARK_TASK_PREFIX}")
}
Map.empty[String, ResourceInformation]
}
resourceInfo
logDebug(s"Resource profile id is: ${resourceProfile.id}")
val resources = getOrDiscoverAllResourcesForResourceProfile(
resourcesFileOpt,
SPARK_EXECUTOR_PREFIX,
resourceProfile)
logResourceInfo(SPARK_EXECUTOR_PREFIX, resources)
resources
}
def extractLogUrls: Map[String, String] = {
@ -237,14 +229,15 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
appId: String,
workerUrl: Option[String],
userClassPath: mutable.ListBuffer[URL],
resourcesFileOpt: Option[String])
resourcesFileOpt: Option[String],
resourceProfileId: Int)
def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, Arguments, SparkEnv) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt)
arguments.resourcesFileOpt, resourceProfile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)
@ -252,7 +245,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
def run(
arguments: Arguments,
backendCreateFn: (RpcEnv, Arguments, SparkEnv) => CoarseGrainedExecutorBackend): Unit = {
backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend): Unit = {
Utils.initDaemon(log)
@ -284,7 +278,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
}
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId))
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
fetcher.shutdown()
@ -307,7 +301,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env))
env.rpcEnv.setupEndpoint("Executor",
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
arguments.workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
@ -325,6 +320,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID
var argv = args.toList
while (!argv.isEmpty) {
@ -357,6 +353,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case ("--resourceProfileId") :: value :: tail =>
resourceProfileId = value.toInt
argv = tail
case Nil =>
case tail =>
// scalastyle:off println
@ -380,7 +379,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
userClassPath, resourcesFileOpt)
userClassPath, resourcesFileOpt, resourceProfileId)
}
private def printUsageAndExit(classNameForEntry: String): Unit = {
@ -399,6 +398,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
| --app-id <appid>
| --worker-url <workerUrl>
| --user-class-path <url>
| --resourceProfileId <id>
|""".stripMargin)
// scalastyle:on println
System.exit(1)

View file

@ -17,10 +17,6 @@
package org.apache.spark.resource
import scala.collection.mutable
import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT
/**
* An Executor resource request. This is used in conjunction with the ResourceProfile to
* programmatically specify the resources needed for an RDD that will be applied at the
@ -28,16 +24,13 @@ import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT
*
* This is used to specify what the resource requirements are for an Executor and how
* Spark can find out specific details about those resources. Not all the parameters are
* required for every resource type. The resources names supported
* correspond to the regular Spark configs with the prefix removed. For instance overhead
* memory in this api is memoryOverhead, which is spark.executor.memoryOverhead with
* spark.executor removed. Resources like GPUs are resource.gpu
* (spark configs spark.executor.resource.gpu.*). The amount, discoveryScript, and vendor
* parameters for resources are all the same parameters a user would specify through the
* required for every resource type. Resources like GPUs are supported and have same limitations
* as using the global spark configs spark.executor.resource.gpu.*. The amount, discoveryScript,
* and vendor parameters for resources are all the same parameters a user would specify through the
* configs: spark.executor.resource.{resourceName}.{amount, discoveryScript, vendor}.
*
* For instance, a user wants to allocate an Executor with GPU resources on YARN. The user has
* to specify the resource name (resource.gpu), the amount or number of GPUs per Executor,
* to specify the resource name (gpu), the amount or number of GPUs per Executor,
* the discovery script would be specified so that when the Executor starts up it can
* discovery what GPU addresses are available for it to use because YARN doesn't tell
* Spark that, then vendor would not be used because its specific for Kubernetes.
@ -63,15 +56,21 @@ private[spark] class ExecutorResourceRequest(
val discoveryScript: String = "",
val vendor: String = "") extends Serializable {
// A list of allowed Spark internal resources. Custom resources (spark.executor.resource.*)
// like GPUs/FPGAs are also allowed, see the check below.
private val allowedExecutorResources = mutable.HashSet[String](
ResourceProfile.MEMORY,
ResourceProfile.OVERHEAD_MEM,
ResourceProfile.PYSPARK_MEM,
ResourceProfile.CORES)
override def equals(obj: Any): Boolean = {
obj match {
case that: ExecutorResourceRequest =>
that.getClass == this.getClass &&
that.resourceName == resourceName && that.amount == amount &&
that.discoveryScript == discoveryScript && that.vendor == vendor
case _ =>
false
}
}
if (!allowedExecutorResources.contains(resourceName) && !resourceName.startsWith(RESOURCE_DOT)) {
throw new IllegalArgumentException(s"Executor resource not allowed: $resourceName")
override def hashCode(): Int =
Seq(resourceName, amount, discoveryScript, vendor).hashCode()
override def toString(): String = {
s"name: $resourceName, amount: $amount, script: $discoveryScript, vendor: $vendor"
}
}

View file

@ -17,7 +17,9 @@
package org.apache.spark.resource
import scala.collection.mutable
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.resource.ResourceProfile._
@ -32,9 +34,9 @@ import org.apache.spark.resource.ResourceProfile._
*/
private[spark] class ExecutorResourceRequests() extends Serializable {
private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]()
private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
def requests: Map[String, ExecutorResourceRequest] = _executorResources.toMap
def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
/**
* Specify heap memory. The value specified will be converted to MiB.
@ -44,8 +46,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
*/
def memory(amount: String): this.type = {
val amountMiB = JavaUtils.byteStringAsMb(amount)
val rr = new ExecutorResourceRequest(MEMORY, amountMiB)
_executorResources(MEMORY) = rr
val req = new ExecutorResourceRequest(MEMORY, amountMiB)
_executorResources.put(MEMORY, req)
this
}
@ -57,8 +59,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
*/
def memoryOverhead(amount: String): this.type = {
val amountMiB = JavaUtils.byteStringAsMb(amount)
val rr = new ExecutorResourceRequest(OVERHEAD_MEM, amountMiB)
_executorResources(OVERHEAD_MEM) = rr
val req = new ExecutorResourceRequest(OVERHEAD_MEM, amountMiB)
_executorResources.put(OVERHEAD_MEM, req)
this
}
@ -70,8 +72,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
*/
def pysparkMemory(amount: String): this.type = {
val amountMiB = JavaUtils.byteStringAsMb(amount)
val rr = new ExecutorResourceRequest(PYSPARK_MEM, amountMiB)
_executorResources(PYSPARK_MEM) = rr
val req = new ExecutorResourceRequest(PYSPARK_MEM, amountMiB)
_executorResources.put(PYSPARK_MEM, req)
this
}
@ -81,15 +83,17 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
* @param amount Number of cores to allocate per Executor.
*/
def cores(amount: Int): this.type = {
val t = new ExecutorResourceRequest(CORES, amount)
_executorResources(CORES) = t
val req = new ExecutorResourceRequest(CORES, amount)
_executorResources.put(CORES, req)
this
}
/**
* Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported
* correspond to the regular Spark configs with the prefix removed. For instance, resources
* like GPUs are resource.gpu (spark configs spark.executor.resource.gpu.*)
* like GPUs are gpu (spark configs spark.executor.resource.gpu.*). If you pass in a resource
* that the cluster manager doesn't support the result is undefined, it may error or may just
* be ignored.
*
* @param resourceName Name of the resource.
* @param amount amount of that resource per executor to use.
@ -106,13 +110,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable {
vendor: String = ""): this.type = {
// a bit weird but for Java api use empty string as meaning None because empty
// string is otherwise invalid for those paramters anyway
val eReq = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor)
_executorResources(resourceName) = eReq
this
}
def addRequest(ereq: ExecutorResourceRequest): this.type = {
_executorResources(ereq.resourceName) = ereq
val req = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor)
_executorResources.put(resourceName, req)
this
}

View file

@ -18,130 +18,164 @@
package org.apache.spark.resource
import java.util.{Map => JMap}
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceUtils.RESOURCE_PREFIX
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
/**
* Resource profile to associate with an RDD. A ResourceProfile allows the user to
* specify executor and task requirements for an RDD that will get applied during a
* stage. This allows the user to change the resource requirements between stages.
*
* This class is private now for initial development, once we have the feature in place
* this will become public.
* This is meant to be immutable so user can't change it after building.
*/
@Evolving
private[spark] class ResourceProfile() extends Serializable {
class ResourceProfile(
val executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging {
private val _id = ResourceProfile.getNextProfileId
private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]()
private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]()
// _id is only a var for testing purposes
private var _id = ResourceProfile.getNextProfileId
def id: Int = _id
def taskResources: Map[String, TaskResourceRequest] = _taskResources.toMap
def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.toMap
/**
* (Java-specific) gets a Java Map of resources to TaskResourceRequest
*/
def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asJava
def taskResourcesJMap: JMap[String, TaskResourceRequest] = taskResources.asJava
/**
* (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
*/
def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = _executorResources.asJava
def reset(): Unit = {
_taskResources.clear()
_executorResources.clear()
def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = {
executorResources.asJava
}
def require(requests: ExecutorResourceRequests): this.type = {
_executorResources ++= requests.requests
this
// Note that some cluster managers don't set the executor cores explicitly so
// be sure to check the Option as required
private[spark] def getExecutorCores: Option[Int] = {
executorResources.get(ResourceProfile.CORES).map(_.amount.toInt)
}
def require(requests: TaskResourceRequests): this.type = {
_taskResources ++= requests.requests
this
private[spark] def getTaskCpus: Option[Int] = {
taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
}
// testing only
private[spark] def setToDefaultProfile(): Unit = {
_id = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
}
override def equals(obj: Any): Boolean = {
obj match {
case that: ResourceProfile =>
that.getClass == this.getClass && that.id == _id &&
that.taskResources == taskResources && that.executorResources == executorResources
case _ =>
false
}
}
override def hashCode(): Int = Seq(taskResources, executorResources).hashCode()
override def toString(): String = {
s"Profile: id = ${_id}, executor resources: ${_executorResources}, " +
s"task resources: ${_taskResources}"
s"Profile: id = ${_id}, executor resources: ${executorResources.mkString(",")}, " +
s"task resources: ${taskResources.mkString(",")}"
}
}
private[spark] object ResourceProfile extends Logging {
val UNKNOWN_RESOURCE_PROFILE_ID = -1
val DEFAULT_RESOURCE_PROFILE_ID = 0
object ResourceProfile extends Logging {
// task resources
val CPUS = "cpus"
// Executor resources
val CORES = "cores"
val MEMORY = "memory"
val OVERHEAD_MEM = "memoryOverhead"
val PYSPARK_MEM = "pyspark.memory"
// all supported spark executor resources (minus the custom resources like GPUs/FPGAs)
val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM)
val UNKNOWN_RESOURCE_PROFILE_ID = -1
val DEFAULT_RESOURCE_PROFILE_ID = 0
private lazy val nextProfileId = new AtomicInteger(0)
private val DEFAULT_PROFILE_LOCK = new Object()
// The default resource profile uses the application level configs.
// Create the default profile immediately to get ID 0, its initialized later when fetched.
private val defaultProfileRef: AtomicReference[ResourceProfile] =
new AtomicReference[ResourceProfile](new ResourceProfile())
// var so that it can be reset for testing purposes.
@GuardedBy("DEFAULT_PROFILE_LOCK")
private var defaultProfile: Option[ResourceProfile] = None
assert(defaultProfileRef.get().id == DEFAULT_RESOURCE_PROFILE_ID,
s"Default Profile must have the default profile id: $DEFAULT_RESOURCE_PROFILE_ID")
private[spark] def getNextProfileId: Int = nextProfileId.getAndIncrement()
def getNextProfileId: Int = nextProfileId.getAndIncrement()
def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile = {
val defaultProf = defaultProfileRef.get()
// check to see if the default profile was initialized yet
if (defaultProf.executorResources == Map.empty) {
synchronized {
val prof = defaultProfileRef.get()
if (prof.executorResources == Map.empty) {
addDefaultTaskResources(prof, conf)
addDefaultExecutorResources(prof, conf)
}
prof
private[spark] def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile = {
DEFAULT_PROFILE_LOCK.synchronized {
defaultProfile match {
case Some(prof) => prof
case None =>
val taskResources = getDefaultTaskResources(conf)
val executorResources = getDefaultExecutorResources(conf)
val defProf = new ResourceProfile(executorResources, taskResources)
defProf.setToDefaultProfile
defaultProfile = Some(defProf)
logInfo("Default ResourceProfile created, executor resources: " +
s"${defProf.executorResources}, task resources: " +
s"${defProf.taskResources}")
defProf
}
} else {
defaultProf
}
}
private def addDefaultTaskResources(rprof: ResourceProfile, conf: SparkConf): Unit = {
private def getDefaultTaskResources(conf: SparkConf): Map[String, TaskResourceRequest] = {
val cpusPerTask = conf.get(CPUS_PER_TASK)
val treqs = new TaskResourceRequests().cpus(cpusPerTask)
val taskReq = ResourceUtils.parseResourceRequirements(conf, SPARK_TASK_PREFIX)
taskReq.foreach { req =>
val name = s"${RESOURCE_PREFIX}.${req.resourceName}"
treqs.resource(name, req.amount)
}
rprof.require(treqs)
ResourceUtils.addTaskResourceRequests(conf, treqs)
treqs.requests
}
private def addDefaultExecutorResources(rprof: ResourceProfile, conf: SparkConf): Unit = {
private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = {
val ereqs = new ExecutorResourceRequests()
ereqs.cores(conf.get(EXECUTOR_CORES))
ereqs.memory(conf.get(EXECUTOR_MEMORY).toString)
conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem => ereqs.memoryOverhead(mem.toString))
conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem => ereqs.pysparkMemory(mem.toString))
val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX)
execReq.foreach { req =>
val name = s"${RESOURCE_PREFIX}.${req.id.resourceName}"
val name = req.id.resourceName
ereqs.resource(name, req.amount, req.discoveryScript.getOrElse(""),
req.vendor.getOrElse(""))
}
rprof.require(ereqs)
ereqs.requests
}
// for testing purposes
def resetDefaultProfile(conf: SparkConf): Unit = getOrCreateDefaultProfile(conf).reset()
// for testing only
private[spark] def reInitDefaultProfile(conf: SparkConf): Unit = {
clearDefaultProfile
// force recreate it after clearing
getOrCreateDefaultProfile(conf)
}
// for testing only
private[spark] def clearDefaultProfile: Unit = {
DEFAULT_PROFILE_LOCK.synchronized {
defaultProfile = None
}
}
private[spark] def getCustomTaskResources(
rp: ResourceProfile): Map[String, TaskResourceRequest] = {
rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS))
}
private[spark] def getCustomExecutorResources(
rp: ResourceProfile): Map[String, ExecutorResourceRequest] = {
rp.executorResources.filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k))
}
}

View file

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.resource
import java.util.{Map => JMap}
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.spark.annotation.Evolving
/**
* Resource profile builder to build a Resource profile to associate with an RDD.
* A ResourceProfile allows the user to specify executor and task requirements for an RDD
* that will get applied during a stage. This allows the user to change the resource
* requirements between stages.
*/
@Evolving
class ResourceProfileBuilder() {
private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
def taskResources: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap
/**
* (Java-specific) gets a Java Map of resources to TaskResourceRequest
*/
def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asScala.asJava
/**
* (Java-specific) gets a Java Map of resources to ExecutorResourceRequest
*/
def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = {
_executorResources.asScala.asJava
}
def require(requests: ExecutorResourceRequests): this.type = {
_executorResources.putAll(requests.requests.asJava)
this
}
def require(requests: TaskResourceRequests): this.type = {
_taskResources.putAll(requests.requests.asJava)
this
}
def clearExecutorResourceRequests(): this.type = {
_executorResources.clear()
this
}
def clearTaskResourceRequests(): this.type = {
_taskResources.clear()
this
}
override def toString(): String = {
"Profile executor resources: " +
s"${_executorResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}, " +
s"task resources: ${_taskResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}"
}
def build: ResourceProfile = {
new ResourceProfile(executorResources, taskResources)
}
}

View file

@ -111,7 +111,7 @@ private[spark] object ResourceUtils extends Logging {
}
def listResourceIds(sparkConf: SparkConf, componentName: String): Seq[ResourceID] = {
sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_DOT").map { case (key, _) =>
sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_PREFIX.").map { case (key, _) =>
key.substring(0, key.indexOf('.'))
}.toSet.toSeq.map(name => ResourceID(componentName, name))
}
@ -124,6 +124,35 @@ private[spark] object ResourceUtils extends Logging {
.filter(_.amount > 0)
}
// Used to take a fraction amount from a task resource requirement and split into a real
// integer amount and the number of parts expected. For instance, if the amount is 0.5,
// the we get (1, 2) back out.
// Returns tuple of (amount, numParts)
def calculateAmountAndPartsForFraction(amount: Double): (Int, Int) = {
val parts = if (amount <= 0.5) {
Math.floor(1.0 / amount).toInt
} else if (amount % 1 != 0) {
throw new SparkException(
s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
} else {
1
}
(Math.ceil(amount).toInt, parts)
}
// Add any task resource requests from the spark conf to the TaskResourceRequests passed in
def addTaskResourceRequests(
sparkConf: SparkConf,
treqs: TaskResourceRequests): Unit = {
listResourceIds(sparkConf, SPARK_TASK_PREFIX).map { resourceId =>
val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap
val amountDouble = settings.getOrElse(AMOUNT,
throw new SparkException(s"You must specify an amount for ${resourceId.resourceName}")
).toDouble
treqs.resource(resourceId.resourceName, amountDouble)
}
}
def parseResourceRequirements(sparkConf: SparkConf, componentName: String)
: Seq[ResourceRequirement] = {
val resourceIds = listResourceIds(sparkConf, componentName)
@ -136,15 +165,7 @@ private[spark] object ResourceUtils extends Logging {
}
rnamesAndAmounts.filter { case (_, amount) => amount > 0 }.map { case (rName, amountDouble) =>
val (amount, parts) = if (componentName.equalsIgnoreCase(SPARK_TASK_PREFIX)) {
val parts = if (amountDouble <= 0.5) {
Math.floor(1.0 / amountDouble).toInt
} else if (amountDouble % 1 != 0) {
throw new SparkException(
s"The resource amount ${amountDouble} must be either <= 0.5, or a whole number.")
} else {
1
}
(Math.ceil(amountDouble).toInt, parts)
calculateAmountAndPartsForFraction(amountDouble)
} else if (amountDouble % 1 != 0) {
throw new SparkException(
s"Only tasks support fractional resources, please check your $componentName settings")
@ -181,12 +202,18 @@ private[spark] object ResourceUtils extends Logging {
}
}
def parseAllocated(
resourcesFileOpt: Option[String],
componentName: String): Seq[ResourceAllocation] = {
resourcesFileOpt.toSeq.flatMap(parseAllocatedFromJsonFile)
.filter(_.id.componentName == componentName)
}
private def parseAllocatedOrDiscoverResources(
sparkConf: SparkConf,
componentName: String,
resourcesFileOpt: Option[String]): Seq[ResourceAllocation] = {
val allocated = resourcesFileOpt.toSeq.flatMap(parseAllocatedFromJsonFile)
.filter(_.id.componentName == componentName)
val allocated = parseAllocated(resourcesFileOpt, componentName)
val otherResourceIds = listResourceIds(sparkConf, componentName).diff(allocated.map(_.id))
val otherResources = otherResourceIds.flatMap { id =>
val request = parseResourceRequest(sparkConf, id)
@ -215,9 +242,24 @@ private[spark] object ResourceUtils extends Logging {
requests.foreach(r => assertResourceAllocationMeetsRequest(allocated(r.id), r))
}
private def assertAllResourceAllocationsMatchResourceProfile(
allocations: Map[String, ResourceInformation],
execReqs: Map[String, ExecutorResourceRequest]): Unit = {
execReqs.foreach { case (rName, req) =>
require(allocations.contains(rName) && allocations(rName).addresses.size >= req.amount,
s"Resource: ${rName}, with addresses: " +
s"${allocations(rName).addresses.mkString(",")} " +
s"is less than what the user requested: ${req.amount})")
}
}
/**
* Gets all allocated resource information for the input component from input resources file and
* discover the remaining via discovery scripts.
* the application level Spark configs. It first looks to see if resource were explicitly
* specified in the resources file (this would include specified address assignments and it only
* specified in certain cluster managers) and then it looks at the Spark configs to get any
* others not specified in the file. The resources not explicitly set in the file require a
* discovery script for it to run to get the addresses of the resource.
* It also verifies the resource allocation meets required amount for each resource.
* @return a map from resource name to resource info
*/
@ -232,6 +274,37 @@ private[spark] object ResourceUtils extends Logging {
resourceInfoMap
}
/**
* This function is similar to getOrDiscoverallResources, except for it uses the ResourceProfile
* information instead of the application level configs.
*
* It first looks to see if resource were explicitly specified in the resources file
* (this would include specified address assignments and it only specified in certain
* cluster managers) and then it looks at the ResourceProfile to get
* any others not specified in the file. The resources not explicitly set in the file require a
* discovery script for it to run to get the addresses of the resource.
* It also verifies the resource allocation meets required amount for each resource.
*
* @return a map from resource name to resource info
*/
def getOrDiscoverAllResourcesForResourceProfile(
resourcesFileOpt: Option[String],
componentName: String,
resourceProfile: ResourceProfile): Map[String, ResourceInformation] = {
val fileAllocated = parseAllocated(resourcesFileOpt, componentName)
val fileAllocResMap = fileAllocated.map(a => (a.id.resourceName, a.toResourceInformation)).toMap
// only want to look at the ResourceProfile for resources not in the resources file
val execReq = ResourceProfile.getCustomExecutorResources(resourceProfile)
val filteredExecreq = execReq.filterNot { case (rname, _) => fileAllocResMap.contains(rname) }
val rpAllocations = filteredExecreq.map { case (rName, execRequest) =>
val addrs = discoverResource(rName, Option(execRequest.discoveryScript)).addresses
(rName, new ResourceInformation(rName, addrs))
}
val allAllocations = fileAllocResMap ++ rpAllocations
assertAllResourceAllocationsMatchResourceProfile(allAllocations, execReq)
allAllocations
}
def logResourceInfo(componentName: String, resources: Map[String, ResourceInformation])
: Unit = {
logInfo("==============================================================")
@ -240,9 +313,9 @@ private[spark] object ResourceUtils extends Logging {
}
// visible for test
private[spark] def discoverResource(resourceRequest: ResourceRequest): ResourceInformation = {
val resourceName = resourceRequest.id.resourceName
val script = resourceRequest.discoveryScript
private[spark] def discoverResource(
resourceName: String,
script: Option[String]): ResourceInformation = {
val result = if (script.nonEmpty) {
val scriptFile = new File(script.get)
// check that script exists and try to execute
@ -264,10 +337,16 @@ private[spark] object ResourceUtils extends Logging {
result
}
// visible for test
private[spark] def discoverResource(resourceRequest: ResourceRequest): ResourceInformation = {
val resourceName = resourceRequest.id.resourceName
val script = resourceRequest.discoveryScript
discoverResource(resourceName, script)
}
// known types of resources
final val GPU: String = "gpu"
final val FPGA: String = "fpga"
final val RESOURCE_PREFIX: String = "resource"
final val RESOURCE_DOT: String = s"$RESOURCE_PREFIX."
}

View file

@ -17,10 +17,6 @@
package org.apache.spark.resource
import scala.collection.mutable
import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT
/**
* A task resource request. This is used in conjuntion with the ResourceProfile to
* programmatically specify the resources needed for an RDD that will be applied at the
@ -37,7 +33,19 @@ private[spark] class TaskResourceRequest(val resourceName: String, val amount: D
assert(amount <= 0.5 || amount % 1 == 0,
s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
if (!resourceName.equals(ResourceProfile.CPUS) && !resourceName.startsWith(RESOURCE_DOT)) {
throw new IllegalArgumentException(s"Task resource not allowed: $resourceName")
override def equals(obj: Any): Boolean = {
obj match {
case that: TaskResourceRequest =>
that.getClass == this.getClass &&
that.resourceName == resourceName && that.amount == amount
case _ =>
false
}
}
override def hashCode(): Int = Seq(resourceName, amount).hashCode()
override def toString(): String = {
s"name: $resourceName, amount: $amount"
}
}

View file

@ -17,10 +17,11 @@
package org.apache.spark.resource
import scala.collection.mutable
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.spark.resource.ResourceProfile._
import org.apache.spark.resource.ResourceUtils._
/**
* A set of task resource requests. This is used in conjuntion with the ResourceProfile to
@ -32,9 +33,9 @@ import org.apache.spark.resource.ResourceUtils._
*/
private[spark] class TaskResourceRequests() extends Serializable {
private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]()
private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
def requests: Map[String, TaskResourceRequest] = _taskResources.toMap
def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap
/**
* Specify number of cpus per Task.
@ -42,15 +43,13 @@ private[spark] class TaskResourceRequests() extends Serializable {
* @param amount Number of cpus to allocate per Task.
*/
def cpus(amount: Int): this.type = {
val t = new TaskResourceRequest(CPUS, amount)
_taskResources(CPUS) = t
val treq = new TaskResourceRequest(CPUS, amount)
_taskResources.put(CPUS, treq)
this
}
/**
* Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported
* correspond to the regular Spark configs with the prefix removed. For instance, resources
* like GPUs are resource.gpu (spark configs spark.task.resource.gpu.*)
* Amount of a particular custom resource(GPU, FPGA, etc) to use.
*
* @param resourceName Name of the resource.
* @param amount Amount requesting as a Double to support fractional resource requests.
@ -58,14 +57,14 @@ private[spark] class TaskResourceRequests() extends Serializable {
* lets you configure X number of tasks to run on a single resource,
* ie amount equals 0.5 translates into 2 tasks per resource address.
*/
def resource(rName: String, amount: Double): this.type = {
val t = new TaskResourceRequest(rName, amount)
_taskResources(rName) = t
def resource(resourceName: String, amount: Double): this.type = {
val treq = new TaskResourceRequest(resourceName, amount)
_taskResources.put(resourceName, treq)
this
}
def addRequest(treq: TaskResourceRequest): this.type = {
_taskResources(treq.resourceName) = treq
_taskResources.put(treq.resourceName, treq)
this
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster
import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.ExecutorLossReason
import org.apache.spark.util.SerializableBuffer
@ -29,12 +29,13 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
private[spark] object CoarseGrainedClusterMessages {
case object RetrieveSparkAppConfig extends CoarseGrainedClusterMessage
case class RetrieveSparkAppConfig(resourceProfileId: Int) extends CoarseGrainedClusterMessage
case class SparkAppConfig(
sparkProperties: Seq[(String, String)],
ioEncryptionKey: Option[Array[Byte]],
hadoopDelegationCreds: Option[Array[Byte]])
hadoopDelegationCreds: Option[Array[Byte]],
resourceProfile: ResourceProfile)
extends CoarseGrainedClusterMessage
case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
@ -59,7 +60,8 @@ private[spark] object CoarseGrainedClusterMessages {
cores: Int,
logUrls: Map[String, String],
attributes: Map[String, String],
resources: Map[String, ResourceInformation])
resources: Map[String, ResourceInformation],
resourceProfileId: Int)
extends CoarseGrainedClusterMessage
case class LaunchedExecutor(executorId: String) extends CoarseGrainedClusterMessage

View file

@ -33,7 +33,7 @@ import org.apache.spark.executor.ExecutorLogUrlHandler
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network._
import org.apache.spark.resource.ResourceRequirement
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@ -205,7 +205,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
attributes, resources) =>
attributes, resources, resourceProfileId) =>
if (executorDataMap.contains(executorId)) {
context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
} else if (scheduler.nodeBlacklist.contains(hostname) ||
@ -236,7 +236,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
val data = new ExecutorData(executorRef, executorAddress, hostname,
0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
resourcesInfo)
resourcesInfo, resourceProfileId)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
@ -270,11 +270,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
removeWorker(workerId, host, message)
context.reply(true)
case RetrieveSparkAppConfig =>
case RetrieveSparkAppConfig(resourceProfileId) =>
// note this will be updated in later prs to get the ResourceProfile from a
// ResourceProfileManager that matches the resource profile id
// for now just use default profile
val rp = ResourceProfile.getOrCreateDefaultProfile(conf)
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
Option(delegationTokens.get()))
Option(delegationTokens.get()),
rp)
context.reply(reply)
}
@ -570,6 +575,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.get(executorId).map(_.resourcesInfo).getOrElse(Map.empty)
}
// this function is for testing only
def getExecutorResourceProfileId(executorId: String): Int = synchronized {
val res = executorDataMap.get(executorId)
res.map(_.resourceProfileId).getOrElse(ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID)
}
/**
* Request an additional number of executors from the cluster manager.
* @return whether the request is acknowledged.

View file

@ -29,6 +29,7 @@ import org.apache.spark.scheduler.ExecutorResourceInfo
* @param freeCores The current number of cores available for work on the executor
* @param totalCores The total number of cores available to the executor
* @param resourcesInfo The information of the currently available resources on the executor
* @param resourceProfileId The id of the ResourceProfile being used by this executor
*/
private[cluster] class ExecutorData(
val executorEndpoint: RpcEndpointRef,
@ -38,5 +39,7 @@ private[cluster] class ExecutorData(
override val totalCores: Int,
override val logUrlMap: Map[String, String],
override val attributes: Map[String, String],
override val resourcesInfo: Map[String, ExecutorResourceInfo]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes, resourcesInfo)
override val resourcesInfo: Map[String, ExecutorResourceInfo],
override val resourceProfileId: Int
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,
resourcesInfo, resourceProfileId)

View file

@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
/**
* :: DeveloperApi ::
@ -25,14 +26,15 @@ import org.apache.spark.resource.ResourceInformation
*/
@DeveloperApi
class ExecutorInfo(
val executorHost: String,
val totalCores: Int,
val logUrlMap: Map[String, String],
val attributes: Map[String, String],
val resourcesInfo: Map[String, ResourceInformation]) {
val executorHost: String,
val totalCores: Int,
val logUrlMap: Map[String, String],
val attributes: Map[String, String],
val resourcesInfo: Map[String, ResourceInformation],
val resourceProfileId: Int) {
def this(executorHost: String, totalCores: Int, logUrlMap: Map[String, String]) = {
this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty)
this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID)
}
def this(
@ -40,7 +42,17 @@ class ExecutorInfo(
totalCores: Int,
logUrlMap: Map[String, String],
attributes: Map[String, String]) = {
this(executorHost, totalCores, logUrlMap, attributes, Map.empty)
this(executorHost, totalCores, logUrlMap, attributes, Map.empty, DEFAULT_RESOURCE_PROFILE_ID)
}
def this(
executorHost: String,
totalCores: Int,
logUrlMap: Map[String, String],
attributes: Map[String, String],
resourcesInfo: Map[String, ResourceInformation]) = {
this(executorHost, totalCores, logUrlMap, attributes, resourcesInfo,
DEFAULT_RESOURCE_PROFILE_ID)
}
def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
@ -52,12 +64,14 @@ class ExecutorInfo(
totalCores == that.totalCores &&
logUrlMap == that.logUrlMap &&
attributes == that.attributes &&
resourcesInfo == that.resourcesInfo
resourcesInfo == that.resourcesInfo &&
resourceProfileId == that.resourceProfileId
case _ => false
}
override def hashCode(): Int = {
val state = Seq(executorHost, totalCores, logUrlMap, attributes, resourcesInfo)
val state = Seq(executorHost, totalCores, logUrlMap, attributes, resourcesInfo,
resourceProfileId)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}

View file

@ -26,6 +26,7 @@ import scala.collection.mutable
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
import org.apache.spark.scheduler._
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Clock
@ -52,6 +53,7 @@ private[spark] class ExecutorMonitor(
conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING)
private val executors = new ConcurrentHashMap[String, Tracker]()
private val execResourceProfileCount = new ConcurrentHashMap[Int, Int]()
// The following fields are an optimization to avoid having to scan all executors on every EAM
// schedule interval to find out which ones are timed out. They keep track of when the next
@ -92,6 +94,7 @@ private[spark] class ExecutorMonitor(
def reset(): Unit = {
executors.clear()
execResourceProfileCount.clear()
nextTimeout.set(Long.MaxValue)
timedOutExecs = Nil
}
@ -148,8 +151,25 @@ private[spark] class ExecutorMonitor(
def executorCount: Int = executors.size()
def executorCountWithResourceProfile(id: Int): Int = {
execResourceProfileCount.getOrDefault(id, 0)
}
def getResourceProfileId(executorId: String): Int = {
val execTrackingInfo = executors.get(executorId)
if (execTrackingInfo != null) {
execTrackingInfo.resourceProfileId
} else {
UNKNOWN_RESOURCE_PROFILE_ID
}
}
def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => exec.pendingRemoval }
def pendingRemovalCountPerResourceProfileId(id: Int): Int = {
executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size
}
override def onJobStart(event: SparkListenerJobStart): Unit = {
if (!shuffleTrackingEnabled) {
return
@ -261,7 +281,7 @@ private[spark] class ExecutorMonitor(
val executorId = event.taskInfo.executorId
// Guard against a late arriving task start event (SPARK-26927).
if (client.isExecutorActive(executorId)) {
val exec = ensureExecutorIsTracked(executorId)
val exec = ensureExecutorIsTracked(executorId, UNKNOWN_RESOURCE_PROFILE_ID)
exec.updateRunningTasks(1)
}
}
@ -290,15 +310,21 @@ private[spark] class ExecutorMonitor(
}
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
val exec = ensureExecutorIsTracked(event.executorId)
val exec = ensureExecutorIsTracked(event.executorId, event.executorInfo.resourceProfileId)
exec.updateRunningTasks(0)
logInfo(s"New executor ${event.executorId} has registered (new total is ${executors.size()})")
}
private def decrementExecResourceProfileCount(rpId: Int): Unit = {
val count = execResourceProfileCount.getOrDefault(rpId, 0)
execResourceProfileCount.replace(rpId, count, count - 1)
execResourceProfileCount.remove(rpId, 0)
}
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
val removed = executors.remove(event.executorId)
if (removed != null) {
logInfo(s"Executor ${event.executorId} removed (new total is ${executors.size()})")
decrementExecResourceProfileCount(removed.resourceProfileId)
if (!removed.pendingRemoval) {
nextTimeout.set(Long.MinValue)
}
@ -309,8 +335,8 @@ private[spark] class ExecutorMonitor(
if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
return
}
val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId)
val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
UNKNOWN_RESOURCE_PROFILE_ID)
val storageLevel = event.blockUpdatedInfo.storageLevel
val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId]
@ -392,8 +418,26 @@ private[spark] class ExecutorMonitor(
* which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded`
* event, which is possible because these events are posted in different threads. (see SPARK-4951)
*/
private def ensureExecutorIsTracked(id: String): Tracker = {
executors.computeIfAbsent(id, _ => new Tracker())
private def ensureExecutorIsTracked(id: String, resourceProfileId: Int): Tracker = {
val numExecsWithRpId = execResourceProfileCount.computeIfAbsent(resourceProfileId, _ => 0)
val execTracker = executors.computeIfAbsent(id, _ => {
val newcount = numExecsWithRpId + 1
execResourceProfileCount.put(resourceProfileId, newcount)
logDebug(s"Executor added with ResourceProfile id: $resourceProfileId " +
s"count is now $newcount")
new Tracker(resourceProfileId)
})
// if we had added executor before without knowing the resource profile id, fix it up
if (execTracker.resourceProfileId == UNKNOWN_RESOURCE_PROFILE_ID &&
resourceProfileId != UNKNOWN_RESOURCE_PROFILE_ID) {
logDebug(s"Executor: $id, resource profile id was unknown, setting " +
s"it to $resourceProfileId")
execTracker.resourceProfileId = resourceProfileId
// fix up the counts for each resource profile id
execResourceProfileCount.put(resourceProfileId, numExecsWithRpId + 1)
decrementExecResourceProfileCount(UNKNOWN_RESOURCE_PROFILE_ID)
}
execTracker
}
private def updateNextTimeout(newValue: Long): Unit = {
@ -413,7 +457,7 @@ private[spark] class ExecutorMonitor(
}
}
private class Tracker {
private class Tracker(var resourceProfileId: Int) {
@volatile var timeoutAt: Long = Long.MaxValue
// Tracks whether this executor is thought to be timed out. It's used to detect when the list

View file

@ -35,7 +35,7 @@ public class JavaResourceProfileSuite {
ExecutorResourceRequests execReqFpga =
new ExecutorResourceRequests().resource(FPGAResource, 3, "myfpgascript", "nvidia");
ResourceProfile rprof = new ResourceProfile();
ResourceProfileBuilder rprof = new ResourceProfileBuilder();
rprof.require(execReqGpu);
rprof.require(execReqFpga);
TaskResourceRequests taskReq1 = new TaskResourceRequests().resource(GpuResource, 1);

View file

@ -29,6 +29,7 @@ import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.{Clock, ManualClock, SystemClock}
@ -1018,8 +1019,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
manager
}
private val execInfo = new ExecutorInfo("host1", 1, Map.empty,
Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID)
private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): Unit = {
post(SparkListenerExecutorAdded(0L, id, null))
post(SparkListenerExecutorAdded(0L, id, execInfo))
}
private def onExecutorRemoved(manager: ExecutorAllocationManager, id: String): Unit = {

View file

@ -30,6 +30,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@ -177,10 +178,10 @@ class HeartbeatReceiverSuite
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty,
Map.empty))
Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty,
Map.empty))
Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)

View file

@ -22,6 +22,8 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach
import org.scalatest.Suite
import org.apache.spark.resource.ResourceProfile
/** Manages a local `sc` `SparkContext` variable, correctly stopping it after each test. */
trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite =>
@ -42,6 +44,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
def resetSparkContext(): Unit = {
LocalSparkContext.stop(sc)
ResourceProfile.clearDefaultProfile
sc = null
}

View file

@ -31,6 +31,7 @@ import org.apache.spark.deploy.master.ApplicationInfo
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.internal.config
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster._
@ -505,7 +506,7 @@ class StandaloneDynamicAllocationSuite
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty,
Map.empty, Map.empty)
Map.empty, Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val taskScheduler = mock(classOf[TaskSchedulerImpl])
when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
@ -629,7 +630,7 @@ class StandaloneDynamicAllocationSuite
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty,
Map.empty)
Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
backend.driverEndpoint.askSync[Boolean](message)
backend.driverEndpoint.send(LaunchedExecutor(id))
}

View file

@ -17,6 +17,7 @@
package org.apache.spark.executor
import java.io.File
import java.net.URL
import java.nio.ByteBuffer
import java.util.Properties
@ -33,7 +34,7 @@ import org.scalatestplus.mockito.MockitoSugar
import org.apache.spark._
import org.apache.spark.TestUtils._
import org.apache.spark.resource.{ResourceAllocation, ResourceInformation}
import org.apache.spark.resource._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.rpc.RpcEnv
@ -49,13 +50,13 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
test("parsing no resources") {
val conf = new SparkConf
conf.set(TASK_GPU_ID.amountConf, "2")
val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
4, Seq.empty[URL], env, None, resourceProfile)
withTempDir { tmpDir =>
val testResourceArgs: JObject = ("" -> "")
val ja = JArray(List(testResourceArgs))
@ -72,12 +73,11 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
test("parsing one resource") {
val conf = new SparkConf
conf.set(EXECUTOR_GPU_ID.amountConf, "2")
conf.set(TASK_GPU_ID.amountConf, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
withTempDir { tmpDir =>
val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = Extraction.decompose(Seq(ra))
@ -91,18 +91,27 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
}
}
test("parsing multiple resources resource profile") {
val rpBuilder = new ResourceProfileBuilder
val ereqs = new ExecutorResourceRequests().resource(GPU, 2)
ereqs.resource(FPGA, 3)
val rp = rpBuilder.require(ereqs).build
testParsingMultipleResources(new SparkConf, rp)
}
test("parsing multiple resources") {
val conf = new SparkConf
conf.set(EXECUTOR_GPU_ID.amountConf, "2")
conf.set(TASK_GPU_ID.amountConf, "2")
conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
conf.set(TASK_FPGA_ID.amountConf, "3")
testParsingMultipleResources(conf, ResourceProfile.getOrCreateDefaultProfile(conf))
}
def testParsingMultipleResources(conf: SparkConf, resourceProfile: ResourceProfile) {
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
4, Seq.empty[URL], env, None, resourceProfile)
withTempDir { tmpDir =>
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
@ -125,12 +134,11 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
test("error checking parsing resources and executor and task configs") {
val conf = new SparkConf
conf.set(EXECUTOR_GPU_ID.amountConf, "2")
conf.set(TASK_GPU_ID.amountConf, "2")
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
// not enough gpu's on the executor
withTempDir { tmpDir =>
@ -156,20 +164,33 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()
assert(error.contains("User is expecting to use resource: gpu, but didn't specify a " +
"discovery script!"))
assert(error.contains("Resource script: to discover gpu doesn't exist!"))
}
}
test("executor resource found less than required resource profile") {
val rpBuilder = new ResourceProfileBuilder
val ereqs = new ExecutorResourceRequests().resource(GPU, 4)
val treqs = new TaskResourceRequests().resource(GPU, 1)
val rp = rpBuilder.require(ereqs).require(treqs).build
testExecutorResourceFoundLessThanRequired(new SparkConf, rp)
}
test("executor resource found less than required") {
val conf = new SparkConf
val conf = new SparkConf()
conf.set(EXECUTOR_GPU_ID.amountConf, "4")
conf.set(TASK_GPU_ID.amountConf, "1")
testExecutorResourceFoundLessThanRequired(conf, ResourceProfile.getOrCreateDefaultProfile(conf))
}
private def testExecutorResourceFoundLessThanRequired(
conf: SparkConf,
resourceProfile: ResourceProfile) = {
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
4, Seq.empty[URL], env, None, resourceProfile)
// executor resources < required
withTempDir { tmpDir =>
@ -189,7 +210,6 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
test("use resource discovery") {
val conf = new SparkConf
conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
conf.set(TASK_FPGA_ID.amountConf, "3")
assume(!(Utils.isWindows))
withTempDir { dir =>
val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript",
@ -201,7 +221,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
val parsedResources = backend.parseOrFindResources(None)
@ -212,37 +232,56 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
}
}
test("use resource discovery and allocated file option") {
val conf = new SparkConf
conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
conf.set(TASK_FPGA_ID.amountConf, "3")
test("use resource discovery and allocated file option with resource profile") {
assume(!(Utils.isWindows))
withTempDir { dir =>
val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript",
"""{"name": "fpga","addresses":["f1", "f2", "f3"]}""")
conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, scriptPath)
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = Extraction.decompose(Seq(gpuArgs))
val f1 = createTempJsonFile(dir, "resources", ja)
val parsedResources = backend.parseOrFindResources(Some(f1))
assert(parsedResources.size === 2)
assert(parsedResources.get(GPU).nonEmpty)
assert(parsedResources.get(GPU).get.name === GPU)
assert(parsedResources.get(GPU).get.addresses.sameElements(Array("0", "1")))
assert(parsedResources.get(FPGA).nonEmpty)
assert(parsedResources.get(FPGA).get.name === FPGA)
assert(parsedResources.get(FPGA).get.addresses.sameElements(Array("f1", "f2", "f3")))
val rpBuilder = new ResourceProfileBuilder
val ereqs = new ExecutorResourceRequests().resource(FPGA, 3, scriptPath)
ereqs.resource(GPU, 2)
val rp = rpBuilder.require(ereqs).build
allocatedFileAndConfigsResourceDiscoveryTestFpga(dir, new SparkConf, rp)
}
}
test("use resource discovery and allocated file option") {
assume(!(Utils.isWindows))
withTempDir { dir =>
val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript",
"""{"name": "fpga","addresses":["f1", "f2", "f3"]}""")
val conf = new SparkConf
conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, scriptPath)
conf.set(EXECUTOR_GPU_ID.amountConf, "2")
val rp = ResourceProfile.getOrCreateDefaultProfile(conf)
allocatedFileAndConfigsResourceDiscoveryTestFpga(dir, conf, rp)
}
}
private def allocatedFileAndConfigsResourceDiscoveryTestFpga(
dir: File,
conf: SparkConf,
resourceProfile: ResourceProfile) = {
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None, resourceProfile)
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = Extraction.decompose(Seq(gpuArgs))
val f1 = createTempJsonFile(dir, "resources", ja)
val parsedResources = backend.parseOrFindResources(Some(f1))
assert(parsedResources.size === 2)
assert(parsedResources.get(GPU).nonEmpty)
assert(parsedResources.get(GPU).get.name === GPU)
assert(parsedResources.get(GPU).get.addresses.sameElements(Array("0", "1")))
assert(parsedResources.get(FPGA).nonEmpty)
assert(parsedResources.get(FPGA).get.name === FPGA)
assert(parsedResources.get(FPGA).get.addresses.sameElements(Array("f1", "f2", "f3")))
}
test("track allocated resources by taskId") {
val conf = new SparkConf
@ -253,15 +292,16 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
try {
val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
val env = createMockEnv(conf, serializer, Some(rpcEnv))
backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
"host1", "host1", 4, Seq.empty[URL], env, None)
backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
"host1", "host1", 4, Seq.empty[URL], env, None,
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
assert(backend.taskResources.isEmpty)
val taskId = 1000000
// We don't really verify the data, just pass it around.
val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19, 1,
mutable.Map.empty, mutable.Map.empty, new Properties,
val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000",
19, 1, mutable.Map.empty, mutable.Map.empty, new Properties,
Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data)
val serializedTaskDescription = TaskDescription.encode(taskDescription)
backend.executor = mock[Executor]
@ -271,13 +311,15 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription)))
eventually(timeout(10.seconds)) {
assert(backend.taskResources.size == 1)
assert(backend.taskResources(taskId)(GPU).addresses sameElements Array("0", "1"))
val resources = backend.taskResources(taskId)
assert(resources(GPU).addresses sameElements Array("0", "1"))
}
// Update the status of a running task shall not affect `taskResources` map.
backend.statusUpdate(taskId, TaskState.RUNNING, data)
assert(backend.taskResources.size == 1)
assert(backend.taskResources(taskId)(GPU).addresses sameElements Array("0", "1"))
val resources = backend.taskResources(taskId)
assert(resources(GPU).addresses sameElements Array("0", "1"))
// Update the status of a finished task shall remove the entry from `taskResources` map.
backend.statusUpdate(taskId, TaskState.FINISHED, data)

View file

@ -18,72 +18,97 @@
package org.apache.spark.resource
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.{EXECUTOR_CORES, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, SPARK_EXECUTOR_PREFIX}
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
class ResourceProfileSuite extends SparkFunSuite {
override def afterEach() {
try {
ResourceProfile.resetDefaultProfile(new SparkConf)
ResourceProfile.clearDefaultProfile
} finally {
super.afterEach()
}
}
test("Default ResourceProfile") {
val rprof = ResourceProfile.getOrCreateDefaultProfile(new SparkConf)
assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
assert(rprof.executorResources.size === 2,
"Executor resources should contain cores and memory by default")
assert(rprof.executorResources(ResourceProfile.CORES).amount === 1,
s"Executor resources should have 1 core")
"Executor resources should have 1 core")
assert(rprof.getExecutorCores.get === 1,
"Executor resources should have 1 core")
assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 1024,
s"Executor resources should have 1024 memory")
"Executor resources should have 1024 memory")
assert(rprof.executorResources.get(ResourceProfile.PYSPARK_MEM) == None,
"pyspark memory empty if not specified")
assert(rprof.executorResources.get(ResourceProfile.OVERHEAD_MEM) == None,
"overhead memory empty if not specified")
assert(rprof.taskResources.size === 1,
"Task resources should just contain cpus by default")
assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1,
s"Task resources should have 1 cpu")
"Task resources should have 1 cpu")
assert(rprof.getTaskCpus.get === 1,
"Task resources should have 1 cpu")
}
test("Default ResourceProfile with app level resources specified") {
val conf = new SparkConf
conf.set(PYSPARK_EXECUTOR_MEMORY.key, "2g")
conf.set(EXECUTOR_MEMORY_OVERHEAD.key, "1g")
conf.set(EXECUTOR_MEMORY.key, "4g")
conf.set(EXECUTOR_CORES.key, "4")
conf.set("spark.task.resource.gpu.amount", "1")
conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.amount", "1")
conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.discoveryScript", "nameOfScript")
val rprof = ResourceProfile.getOrCreateDefaultProfile(conf)
assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val execResources = rprof.executorResources
assert(execResources.size === 3,
assert(execResources.size === 5,
"Executor resources should contain cores, memory, and gpu " + execResources)
assert(execResources.contains("gpu"), "Executor resources should have gpu")
assert(rprof.executorResources(ResourceProfile.CORES).amount === 4,
"Executor resources should have 4 core")
assert(rprof.getExecutorCores.get === 4, "Executor resources should have 4 core")
assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
"Executor resources should have 1024 memory")
assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount == 2048,
"pyspark memory empty if not specified")
assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount == 1024,
"overhead memory empty if not specified")
assert(rprof.taskResources.size === 2,
"Task resources should just contain cpus and gpu")
assert(execResources.contains("resource.gpu"), "Executor resources should have gpu")
assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu")
}
test("test default profile task gpus fractional") {
val sparkConf = new SparkConf()
.set("spark.executor.resource.gpu.amount", "2")
.set("spark.task.resource.gpu.amount", "0.33")
val immrprof = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
assert(immrprof.taskResources.get("gpu").get.amount == 0.33)
}
test("Create ResourceProfile") {
val rprof = new ResourceProfile()
assert(rprof.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
assert(rprof.executorResources === Map.empty)
assert(rprof.taskResources === Map.empty)
val taskReq = new TaskResourceRequests().resource("resource.gpu", 1)
val eReq = new ExecutorResourceRequests().resource("resource.gpu", 2, "myscript", "nvidia")
val rprof = new ResourceProfileBuilder()
val taskReq = new TaskResourceRequests().resource("gpu", 1)
val eReq = new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
rprof.require(taskReq).require(eReq)
assert(rprof.executorResources.size === 1)
assert(rprof.executorResources.contains("resource.gpu"),
assert(rprof.executorResources.contains("gpu"),
"Executor resources should have gpu")
assert(rprof.executorResources.get("resource.gpu").get.vendor === "nvidia",
assert(rprof.executorResources.get("gpu").get.vendor === "nvidia",
"gpu vendor should be nvidia")
assert(rprof.executorResources.get("resource.gpu").get.discoveryScript === "myscript",
assert(rprof.executorResources.get("gpu").get.discoveryScript === "myscript",
"discoveryScript should be myscript")
assert(rprof.executorResources.get("resource.gpu").get.amount === 2,
assert(rprof.executorResources.get("gpu").get.amount === 2,
"gpu amount should be 2")
assert(rprof.taskResources.size === 1, "Should have 1 task resource")
assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
assert(rprof.taskResources.get("resource.gpu").get.amount === 1,
assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu")
assert(rprof.taskResources.get("gpu").get.amount === 1,
"Task resources should have 1 gpu")
val ereqs = new ExecutorResourceRequests()
@ -97,70 +122,59 @@ class ResourceProfileSuite extends SparkFunSuite {
assert(rprof.executorResources.size === 5)
assert(rprof.executorResources(ResourceProfile.CORES).amount === 2,
s"Executor resources should have 2 cores")
"Executor resources should have 2 cores")
assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
s"Executor resources should have 4096 memory")
"Executor resources should have 4096 memory")
assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2048,
s"Executor resources should have 2048 overhead memory")
"Executor resources should have 2048 overhead memory")
assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 1024,
s"Executor resources should have 1024 pyspark memory")
"Executor resources should have 1024 pyspark memory")
assert(rprof.taskResources.size === 2)
assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu")
val error = intercept[IllegalArgumentException] {
rprof.require(new ExecutorResourceRequests().resource("bogusResource", 1))
}.getMessage()
assert(error.contains("Executor resource not allowed"))
val taskError = intercept[IllegalArgumentException] {
rprof.require(new TaskResourceRequests().resource("bogusTaskResource", 1))
}.getMessage()
assert(taskError.contains("Task resource not allowed"))
}
test("Test ExecutorResourceRequests memory helpers") {
val rprof = new ResourceProfile()
val rprof = new ResourceProfileBuilder()
val ereqs = new ExecutorResourceRequests()
ereqs.memory("4g")
ereqs.memoryOverhead("2000m").pysparkMemory("512000k")
rprof.require(ereqs)
assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096,
s"Executor resources should have 4096 memory")
"Executor resources should have 4096 memory")
assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2000,
s"Executor resources should have 2000 overhead memory")
"Executor resources should have 2000 overhead memory")
assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 500,
s"Executor resources should have 512 pyspark memory")
"Executor resources should have 512 pyspark memory")
}
test("Test TaskResourceRequest fractional") {
val rprof = new ResourceProfile()
val treqs = new TaskResourceRequests().resource("resource.gpu", 0.33)
val rprof = new ResourceProfileBuilder()
val treqs = new TaskResourceRequests().resource("gpu", 0.33)
rprof.require(treqs)
assert(rprof.taskResources.size === 1, "Should have 1 task resource")
assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu")
assert(rprof.taskResources.get("resource.gpu").get.amount === 0.33,
assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu")
assert(rprof.taskResources.get("gpu").get.amount === 0.33,
"Task resources should have 0.33 gpu")
val fpgaReqs = new TaskResourceRequests().resource("resource.fpga", 4.0)
val fpgaReqs = new TaskResourceRequests().resource("fpga", 4.0)
rprof.require(fpgaReqs)
assert(rprof.taskResources.size === 2, "Should have 2 task resource")
assert(rprof.taskResources.contains("resource.fpga"), "Task resources should have gpu")
assert(rprof.taskResources.get("resource.fpga").get.amount === 4.0,
assert(rprof.taskResources.contains("fpga"), "Task resources should have gpu")
assert(rprof.taskResources.get("fpga").get.amount === 4.0,
"Task resources should have 4.0 gpu")
var taskError = intercept[AssertionError] {
rprof.require(new TaskResourceRequests().resource("resource.gpu", 1.5))
rprof.require(new TaskResourceRequests().resource("gpu", 1.5))
}.getMessage()
assert(taskError.contains("The resource amount 1.5 must be either <= 0.5, or a whole number."))
taskError = intercept[AssertionError] {
rprof.require(new TaskResourceRequests().resource("resource.gpu", 0.7))
rprof.require(new TaskResourceRequests().resource("gpu", 0.7))
}.getMessage()
assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number."))
}
}

View file

@ -128,7 +128,8 @@ class ResourceUtilsSuite extends SparkFunSuite
assert(resourcesFromFileOnly(FPGA) === expectedFpgaInfo)
val gpuDiscovery = createTempScriptWithExpectedOutput(
dir, "gpuDiscoveryScript", """{"name": "gpu", "addresses": ["0", "1"]}""")
dir, "gpuDiscoveryScript",
"""{"name": "gpu", "addresses": ["0", "1"]}""")
conf.set(EXECUTOR_GPU_ID.amountConf, "2")
conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, gpuDiscovery)
val resourcesFromBoth = getOrDiscoverAllResources(
@ -139,6 +140,38 @@ class ResourceUtilsSuite extends SparkFunSuite
}
}
test("get from resources file and discover resource profile remaining") {
val conf = new SparkConf
val rpId = 1
assume(!(Utils.isWindows))
withTempDir { dir =>
implicit val formats = DefaultFormats
val fpgaAddrs = Seq("f1", "f2", "f3")
val fpgaAllocation = ResourceAllocation(EXECUTOR_FPGA_ID, fpgaAddrs)
val resourcesFile = createTempJsonFile(
dir, "resources", Extraction.decompose(Seq(fpgaAllocation)))
val resourcesFromFileOnly = getOrDiscoverAllResourcesForResourceProfile(
Some(resourcesFile),
SPARK_EXECUTOR_PREFIX,
ResourceProfile.getOrCreateDefaultProfile(conf))
val expectedFpgaInfo = new ResourceInformation(FPGA, fpgaAddrs.toArray)
assert(resourcesFromFileOnly(FPGA) === expectedFpgaInfo)
val gpuDiscovery = createTempScriptWithExpectedOutput(
dir, "gpuDiscoveryScript",
"""{"name": "gpu", "addresses": ["0", "1"]}""")
val rpBuilder = new ResourceProfileBuilder()
val ereqs = new ExecutorResourceRequests().resource(GPU, 2, gpuDiscovery)
val treqs = new TaskResourceRequests().resource(GPU, 1)
val rp = rpBuilder.require(ereqs).require(treqs).build
val resourcesFromBoth = getOrDiscoverAllResourcesForResourceProfile(
Some(resourcesFile), SPARK_EXECUTOR_PREFIX, rp)
val expectedGpuInfo = new ResourceInformation(GPU, Array("0", "1"))
assert(resourcesFromBoth(FPGA) === expectedFpgaInfo)
assert(resourcesFromBoth(GPU) === expectedGpuInfo)
}
}
test("list resource ids") {
val conf = new SparkConf
conf.set(DRIVER_GPU_ID.amountConf, "2")
@ -148,7 +181,7 @@ class ResourceUtilsSuite extends SparkFunSuite
conf.set(DRIVER_FPGA_ID.amountConf, "2")
val resourcesMap = listResourceIds(conf, SPARK_DRIVER_PREFIX)
.map{ rId => (rId.resourceName, 1)}.toMap
.map { rId => (rId.resourceName, 1) }.toMap
assert(resourcesMap.size === 2, "should only have GPU for resource")
assert(resourcesMap.get(GPU).nonEmpty, "should have GPU")
assert(resourcesMap.get(FPGA).nonEmpty, "should have FPGA")

View file

@ -34,7 +34,7 @@ import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
@ -173,11 +173,14 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
sc.addSparkListener(listener)
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes,
Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes,
Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty))
RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes,
Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
assert(executorAddedCount === 3)
@ -214,20 +217,25 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
sc.addSparkListener(listener)
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources))
RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources))
RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources))
RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources,
5))
val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100)
val buffer = new SerializableBuffer(bytebuffer)
var execResources = backend.getExecutorAvailableResources("1")
assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
var exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
assert(exec3ResourceProfileId === 5)
val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
var taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
"t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],

View file

@ -27,7 +27,9 @@ import org.mockito.Mockito.{doAnswer, mock, when}
import org.apache.spark._
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, UNKNOWN_RESOURCE_PROFILE_ID}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._
import org.apache.spark.util.ManualClock
@ -47,6 +49,9 @@ class ExecutorMonitorSuite extends SparkFunSuite {
private var client: ExecutorAllocationClient = _
private var clock: ManualClock = _
private val execInfo = new ExecutorInfo("host1", 1, Map.empty,
Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID)
// List of known executors. Allows easily mocking which executors are alive without
// having to use mockito APIs directly in each test.
private val knownExecs = mutable.HashSet[String]()
@ -64,10 +69,12 @@ class ExecutorMonitorSuite extends SparkFunSuite {
test("basic executor timeout") {
knownExecs += "1"
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
assert(monitor.executorCount === 1)
assert(monitor.isExecutorIdle("1"))
assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1)
assert(monitor.getResourceProfileId("1") === DEFAULT_RESOURCE_PROFILE_ID)
}
test("SPARK-4951, SPARK-26927: handle out of order task start events") {
@ -75,26 +82,38 @@ class ExecutorMonitorSuite extends SparkFunSuite {
monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1)))
assert(monitor.executorCount === 1)
assert(monitor.executorCountWithResourceProfile(UNKNOWN_RESOURCE_PROFILE_ID) === 1)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
assert(monitor.executorCount === 1)
assert(monitor.executorCountWithResourceProfile(UNKNOWN_RESOURCE_PROFILE_ID) === 0)
assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1)
assert(monitor.getResourceProfileId("1") === DEFAULT_RESOURCE_PROFILE_ID)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
assert(monitor.executorCount === 2)
assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 2)
assert(monitor.getResourceProfileId("2") === DEFAULT_RESOURCE_PROFILE_ID)
monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), "2", null))
assert(monitor.executorCount === 1)
assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1)
knownExecs -= "2"
monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("2", 2)))
assert(monitor.executorCount === 1)
assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1)
monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), "1", null))
assert(monitor.executorCount === 0)
assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 0)
}
test("track tasks running on executor") {
knownExecs += "1"
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1)))
assert(!monitor.isExecutorIdle("1"))
@ -117,7 +136,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
test("use appropriate time out depending on whether blocks are stored") {
knownExecs += "1"
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
assert(monitor.isExecutorIdle("1"))
assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
@ -139,7 +158,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
}
test("keeps track of stored blocks for each rdd and split") {
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
@ -173,19 +192,19 @@ class ExecutorMonitorSuite extends SparkFunSuite {
knownExecs ++= Set("1", "2", "3")
// start exec 1 at 0s (should idle time out at 60s)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
assert(monitor.isExecutorIdle("1"))
// start exec 2 at 30s, store a block (should idle time out at 150s)
clock.setTime(TimeUnit.SECONDS.toMillis(30))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
monitor.onBlockUpdated(rddUpdate(1, 0, "2"))
assert(monitor.isExecutorIdle("2"))
assert(!monitor.timedOutExecutors(idleDeadline).contains("2"))
// start exec 3 at 60s (should idle timeout at 120s, exec 1 should time out)
clock.setTime(TimeUnit.SECONDS.toMillis(60))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", execInfo))
assert(monitor.timedOutExecutors(clock.nanoTime()) === Seq("1"))
// store block on exec 3 (should now idle time out at 180s)
@ -205,7 +224,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
test("SPARK-27677: don't track blocks stored on disk when using shuffle service") {
// First make sure that blocks on disk are counted when no shuffle service is available.
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.DISK_ONLY))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
@ -213,7 +232,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
conf.set(SHUFFLE_SERVICE_ENABLED, true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
monitor = new ExecutorMonitor(conf, client, null, clock)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.MEMORY_ONLY))
monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = StorageLevel.MEMORY_ONLY))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
@ -236,9 +255,9 @@ class ExecutorMonitorSuite extends SparkFunSuite {
test("track executors pending for removal") {
knownExecs ++= Set("1", "2", "3")
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", execInfo))
clock.setTime(idleDeadline)
assert(monitor.timedOutExecutors().toSet === Set("1", "2", "3"))
assert(monitor.pendingRemovalCount === 0)
@ -286,7 +305,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1, stage2)))
monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(), Seq(stage3, stage4)))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
// First a failed task, to make sure it does not count.
@ -342,7 +361,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
throw new IllegalStateException("No event should be sent.")
}
}
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.shuffleCleaned(0)
}
@ -351,8 +370,8 @@ class ExecutorMonitorSuite extends SparkFunSuite {
conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false)
monitor = new ExecutorMonitor(conf, client, bus, clock)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
// Two separate jobs with separate shuffles. The first job will only run tasks on
// executor 1, the second on executor 2. Ensures that jobs finishing don't affect
@ -401,7 +420,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
val stage = stageInfo(1, shuffleId = 0)
monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage)))
clock.advance(1000L)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1),
new ExecutorMetrics, null))

View file

@ -36,6 +36,14 @@ object MimaExcludes {
// Exclude rules for 3.0.x
lazy val v30excludes = v24excludes ++ Seq(
// [SPARK-29306] Add support for Stage level scheduling for executors
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productElement"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productArity"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.canEqual"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productIterator"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productPrefix"),
ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.toString"),
// [SPARK-29399][core] Remove old ExecutorPlugin interface.
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ExecutorPlugin"),

View file

@ -34,6 +34,7 @@ import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkCon
import org.apache.spark.deploy.mesos.{config => mesosConfig}
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor}
@ -716,7 +717,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val mockEndpointRef = mock[RpcEndpointRef]
val mockAddress = mock[RpcAddress]
val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty,
Map.empty, Map.empty)
Map.empty, Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
backend.driverEndpoint.askSync[Boolean](message)
}

View file

@ -48,6 +48,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Streaming.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS
import org.apache.spark.internal.config.UI._
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@ -455,7 +456,8 @@ private[spark] class ApplicationMaster(
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
dummyRunner.launchContextDebugInfo()
}

View file

@ -40,7 +40,8 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.{Utils, YarnContainerInfoHelper}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.util.Utils
private[yarn] class ExecutorRunnable(
container: Option[Container],
@ -53,7 +54,8 @@ private[yarn] class ExecutorRunnable(
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]) extends Logging {
localResources: Map[String, LocalResource],
resourceProfileId: Int) extends Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
@ -72,7 +74,7 @@ private[yarn] class ExecutorRunnable(
s"""
|===============================================================================
|YARN executor launch context:
|Default YARN executor launch context:
| env:
|${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString}
| command:
@ -207,7 +209,8 @@ private[yarn] class ExecutorRunnable(
"--executor-id", executorId,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId) ++
"--app-id", appId,
"--resourceProfileId", resourceProfileId.toString) ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",

View file

@ -38,6 +38,7 @@ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@ -565,7 +566,8 @@ private[yarn] class YarnAllocator(
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported
).run()
updateInternalState()
} catch {

View file

@ -22,6 +22,7 @@ import java.net.URL
import org.apache.spark.SparkEnv
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.YarnContainerInfoHelper
@ -39,7 +40,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFile: Option[String])
resourcesFile: Option[String],
resourceProfile: ResourceProfile)
extends CoarseGrainedExecutorBackend(
rpcEnv,
driverUrl,
@ -49,7 +51,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
cores,
userClassPath,
env,
resourcesFile) with Logging {
resourcesFile,
resourceProfile) with Logging {
private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf)
@ -67,11 +70,11 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt)
arguments.resourcesFileOpt, resourceProfile)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
this.getClass.getCanonicalName.stripSuffix("$"))