From 6dbfa2bb9c5215aab97ec3f86b3325a11a7ff4d1 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 17 Jan 2020 08:15:25 -0600 Subject: [PATCH] [SPARK-29306][CORE] Stage Level Sched: Executors need to track what ResourceProfile they are created with ### What changes were proposed in this pull request? This is the second PR for the Stage Level Scheduling. This is adding in the necessary executor side changes: 1) executors to know what ResourceProfile they should be using 2) handle parsing the resource profile settings - these are not in the global configs 3) then reporting back to the driver what resource profile it was started with. This PR adds all the piping for YARN to pass the information all the way to executors, but it just uses the default ResourceProfile (which is the global applicatino level configs). At a high level these changes include: 1) adding a new --resourceProfileId option to the CoarseGrainedExecutorBackend 2) Add the ResourceProfile settings to new internal confs that gets passed into the Executor 3) Executor changes that use the resource profile id passed in to read the corresponding ResourceProfile confs and then parse those requests and discover resources as necessary 4) Executor registers to Driver with the Resource profile id so that the ExecutorMonitor can track how many executor with each profile are running 5) YARN side changes to show that passing the resource profile id and confs actually works. Just uses the DefaultResourceProfile for now. I also removed a check from the CoarseGrainedExecutorBackend that used to check to make sure there were task requirements before parsing any custom resource executor requests. With the resource profiles this becomes much more expensive because we would then have to pass the task requests to each executor and the check was just a short cut and not really needed. It was much cleaner just to remove it. Note there were some changes to the ResourceProfile, ExecutorResourceRequests, and TaskResourceRequests in this PR as well because I discovered some issues with things not being immutable. That api now look like: val rpBuilder = new ResourceProfileBuilder() val ereq = new ExecutorResourceRequests() val treq = new TaskResourceRequests() ereq.cores(2).memory("6g").memoryOverhead("2g").pysparkMemory("2g").resource("gpu", 2, "/home/tgraves/getGpus") treq.cpus(2).resource("gpu", 2) val resourceProfile = rpBuilder.require(ereq).require(treq).build This makes is so that ResourceProfile is immutable and Spark can use it directly without worrying about the user changing it. ### Why are the changes needed? These changes are needed for the executor to report which ResourceProfile they are using so that ultimately the dynamic allocation manager can use that information to know how many with a profile are running and how many more it needs to request. Its also needed to get the resource profile confs to the executor so that it can run the appropriate discovery script if needed. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit tests and manually on YARN. Closes #26682 from tgravescs/SPARK-29306. Authored-by: Thomas Graves Signed-off-by: Thomas Graves --- .../CoarseGrainedExecutorBackend.scala | 56 +++---- .../resource/ExecutorResourceRequest.scala | 39 +++-- .../resource/ExecutorResourceRequests.scala | 37 ++-- .../spark/resource/ResourceProfile.scala | 158 +++++++++++------- .../resource/ResourceProfileBuilder.scala | 84 ++++++++++ .../apache/spark/resource/ResourceUtils.scala | 113 +++++++++++-- .../spark/resource/TaskResourceRequest.scala | 20 ++- .../spark/resource/TaskResourceRequests.scala | 25 ++- .../cluster/CoarseGrainedClusterMessage.scala | 10 +- .../CoarseGrainedSchedulerBackend.scala | 21 ++- .../scheduler/cluster/ExecutorData.scala | 7 +- .../scheduler/cluster/ExecutorInfo.scala | 32 +++- .../scheduler/dynalloc/ExecutorMonitor.scala | 60 ++++++- .../resource/JavaResourceProfileSuite.java | 2 +- .../ExecutorAllocationManagerSuite.scala | 6 +- .../apache/spark/HeartbeatReceiverSuite.scala | 5 +- .../org/apache/spark/LocalSparkContext.scala | 3 + .../StandaloneDynamicAllocationSuite.scala | 5 +- .../CoarseGrainedExecutorBackendSuite.scala | 134 ++++++++++----- .../spark/resource/ResourceProfileSuite.scala | 114 +++++++------ .../spark/resource/ResourceUtilsSuite.scala | 37 +++- .../CoarseGrainedSchedulerBackendSuite.scala | 24 ++- .../dynalloc/ExecutorMonitorSuite.scala | 57 ++++--- project/MimaExcludes.scala | 8 + ...osCoarseGrainedSchedulerBackendSuite.scala | 3 +- .../spark/deploy/yarn/ApplicationMaster.scala | 4 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 11 +- .../spark/deploy/yarn/YarnAllocator.scala | 4 +- .../YarnCoarseGrainedExecutorBackend.scala | 13 +- 29 files changed, 756 insertions(+), 336 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 1fe901a83a..f56e7c6d78 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -35,6 +35,8 @@ import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.resource.ResourceProfile._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} @@ -51,7 +53,8 @@ private[spark] class CoarseGrainedExecutorBackend( cores: Int, userClassPath: Seq[URL], env: SparkEnv, - resourcesFileOpt: Option[String]) + resourcesFileOpt: Option[String], + resourceProfile: ResourceProfile) extends IsolatedRpcEndpoint with ExecutorBackend with Logging { import CoarseGrainedExecutorBackend._ @@ -80,7 +83,7 @@ private[spark] class CoarseGrainedExecutorBackend( // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls, - extractAttributes, resources)) + extractAttributes, resources, resourceProfile.id)) }(ThreadUtils.sameThread).onComplete { case Success(_) => self.send(RegisteredExecutor) @@ -91,24 +94,13 @@ private[spark] class CoarseGrainedExecutorBackend( // visible for testing def parseOrFindResources(resourcesFileOpt: Option[String]): Map[String, ResourceInformation] = { - // only parse the resources if a task requires them - val resourceInfo = if (parseResourceRequirements(env.conf, SPARK_TASK_PREFIX).nonEmpty) { - val resources = getOrDiscoverAllResources(env.conf, SPARK_EXECUTOR_PREFIX, resourcesFileOpt) - if (resources.isEmpty) { - throw new SparkException("User specified resources per task via: " + - s"$SPARK_TASK_PREFIX, but can't find any resources available on the executor.") - } else { - logResourceInfo(SPARK_EXECUTOR_PREFIX, resources) - } - resources - } else { - if (resourcesFileOpt.nonEmpty) { - logWarning("A resources file was specified but the application is not configured " + - s"to use any resources, see the configs with prefix: ${SPARK_TASK_PREFIX}") - } - Map.empty[String, ResourceInformation] - } - resourceInfo + logDebug(s"Resource profile id is: ${resourceProfile.id}") + val resources = getOrDiscoverAllResourcesForResourceProfile( + resourcesFileOpt, + SPARK_EXECUTOR_PREFIX, + resourceProfile) + logResourceInfo(SPARK_EXECUTOR_PREFIX, resources) + resources } def extractLogUrls: Map[String, String] = { @@ -237,14 +229,15 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { appId: String, workerUrl: Option[String], userClassPath: mutable.ListBuffer[URL], - resourcesFileOpt: Option[String]) + resourcesFileOpt: Option[String], + resourceProfileId: Int) def main(args: Array[String]): Unit = { - val createFn: (RpcEnv, Arguments, SparkEnv) => - CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => + val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => + CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env, - arguments.resourcesFileOpt) + arguments.resourcesFileOpt, resourceProfile) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) System.exit(0) @@ -252,7 +245,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { def run( arguments: Arguments, - backendCreateFn: (RpcEnv, Arguments, SparkEnv) => CoarseGrainedExecutorBackend): Unit = { + backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => + CoarseGrainedExecutorBackend): Unit = { Utils.initDaemon(log) @@ -284,7 +278,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } } - val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig) + val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId)) val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId)) fetcher.shutdown() @@ -307,7 +301,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) - env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env)) + env.rpcEnv.setupEndpoint("Executor", + backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)) arguments.workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } @@ -325,6 +320,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() + var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID var argv = args.toList while (!argv.isEmpty) { @@ -357,6 +353,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { case ("--user-class-path") :: value :: tail => userClassPath += new URL(value) argv = tail + case ("--resourceProfileId") :: value :: tail => + resourceProfileId = value.toInt + argv = tail case Nil => case tail => // scalastyle:off println @@ -380,7 +379,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl, - userClassPath, resourcesFileOpt) + userClassPath, resourcesFileOpt, resourceProfileId) } private def printUsageAndExit(classNameForEntry: String): Unit = { @@ -399,6 +398,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | --app-id | --worker-url | --user-class-path + | --resourceProfileId |""".stripMargin) // scalastyle:on println System.exit(1) diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala index 88ceaad69b..9a920914ed 100644 --- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala +++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequest.scala @@ -17,10 +17,6 @@ package org.apache.spark.resource -import scala.collection.mutable - -import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT - /** * An Executor resource request. This is used in conjunction with the ResourceProfile to * programmatically specify the resources needed for an RDD that will be applied at the @@ -28,16 +24,13 @@ import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT * * This is used to specify what the resource requirements are for an Executor and how * Spark can find out specific details about those resources. Not all the parameters are - * required for every resource type. The resources names supported - * correspond to the regular Spark configs with the prefix removed. For instance overhead - * memory in this api is memoryOverhead, which is spark.executor.memoryOverhead with - * spark.executor removed. Resources like GPUs are resource.gpu - * (spark configs spark.executor.resource.gpu.*). The amount, discoveryScript, and vendor - * parameters for resources are all the same parameters a user would specify through the + * required for every resource type. Resources like GPUs are supported and have same limitations + * as using the global spark configs spark.executor.resource.gpu.*. The amount, discoveryScript, + * and vendor parameters for resources are all the same parameters a user would specify through the * configs: spark.executor.resource.{resourceName}.{amount, discoveryScript, vendor}. * * For instance, a user wants to allocate an Executor with GPU resources on YARN. The user has - * to specify the resource name (resource.gpu), the amount or number of GPUs per Executor, + * to specify the resource name (gpu), the amount or number of GPUs per Executor, * the discovery script would be specified so that when the Executor starts up it can * discovery what GPU addresses are available for it to use because YARN doesn't tell * Spark that, then vendor would not be used because its specific for Kubernetes. @@ -63,15 +56,21 @@ private[spark] class ExecutorResourceRequest( val discoveryScript: String = "", val vendor: String = "") extends Serializable { - // A list of allowed Spark internal resources. Custom resources (spark.executor.resource.*) - // like GPUs/FPGAs are also allowed, see the check below. - private val allowedExecutorResources = mutable.HashSet[String]( - ResourceProfile.MEMORY, - ResourceProfile.OVERHEAD_MEM, - ResourceProfile.PYSPARK_MEM, - ResourceProfile.CORES) + override def equals(obj: Any): Boolean = { + obj match { + case that: ExecutorResourceRequest => + that.getClass == this.getClass && + that.resourceName == resourceName && that.amount == amount && + that.discoveryScript == discoveryScript && that.vendor == vendor + case _ => + false + } + } - if (!allowedExecutorResources.contains(resourceName) && !resourceName.startsWith(RESOURCE_DOT)) { - throw new IllegalArgumentException(s"Executor resource not allowed: $resourceName") + override def hashCode(): Int = + Seq(resourceName, amount, discoveryScript, vendor).hashCode() + + override def toString(): String = { + s"name: $resourceName, amount: $amount, script: $discoveryScript, vendor: $vendor" } } diff --git a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala index 6ffcc0c296..d345674d66 100644 --- a/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala +++ b/core/src/main/scala/org/apache/spark/resource/ExecutorResourceRequests.scala @@ -17,7 +17,9 @@ package org.apache.spark.resource -import scala.collection.mutable +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ import org.apache.spark.network.util.JavaUtils import org.apache.spark.resource.ResourceProfile._ @@ -32,9 +34,9 @@ import org.apache.spark.resource.ResourceProfile._ */ private[spark] class ExecutorResourceRequests() extends Serializable { - private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]() + private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]() - def requests: Map[String, ExecutorResourceRequest] = _executorResources.toMap + def requests: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap /** * Specify heap memory. The value specified will be converted to MiB. @@ -44,8 +46,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable { */ def memory(amount: String): this.type = { val amountMiB = JavaUtils.byteStringAsMb(amount) - val rr = new ExecutorResourceRequest(MEMORY, amountMiB) - _executorResources(MEMORY) = rr + val req = new ExecutorResourceRequest(MEMORY, amountMiB) + _executorResources.put(MEMORY, req) this } @@ -57,8 +59,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable { */ def memoryOverhead(amount: String): this.type = { val amountMiB = JavaUtils.byteStringAsMb(amount) - val rr = new ExecutorResourceRequest(OVERHEAD_MEM, amountMiB) - _executorResources(OVERHEAD_MEM) = rr + val req = new ExecutorResourceRequest(OVERHEAD_MEM, amountMiB) + _executorResources.put(OVERHEAD_MEM, req) this } @@ -70,8 +72,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable { */ def pysparkMemory(amount: String): this.type = { val amountMiB = JavaUtils.byteStringAsMb(amount) - val rr = new ExecutorResourceRequest(PYSPARK_MEM, amountMiB) - _executorResources(PYSPARK_MEM) = rr + val req = new ExecutorResourceRequest(PYSPARK_MEM, amountMiB) + _executorResources.put(PYSPARK_MEM, req) this } @@ -81,15 +83,17 @@ private[spark] class ExecutorResourceRequests() extends Serializable { * @param amount Number of cores to allocate per Executor. */ def cores(amount: Int): this.type = { - val t = new ExecutorResourceRequest(CORES, amount) - _executorResources(CORES) = t + val req = new ExecutorResourceRequest(CORES, amount) + _executorResources.put(CORES, req) this } /** * Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported * correspond to the regular Spark configs with the prefix removed. For instance, resources - * like GPUs are resource.gpu (spark configs spark.executor.resource.gpu.*) + * like GPUs are gpu (spark configs spark.executor.resource.gpu.*). If you pass in a resource + * that the cluster manager doesn't support the result is undefined, it may error or may just + * be ignored. * * @param resourceName Name of the resource. * @param amount amount of that resource per executor to use. @@ -106,13 +110,8 @@ private[spark] class ExecutorResourceRequests() extends Serializable { vendor: String = ""): this.type = { // a bit weird but for Java api use empty string as meaning None because empty // string is otherwise invalid for those paramters anyway - val eReq = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor) - _executorResources(resourceName) = eReq - this - } - - def addRequest(ereq: ExecutorResourceRequest): this.type = { - _executorResources(ereq.resourceName) = ereq + val req = new ExecutorResourceRequest(resourceName, amount, discoveryScript, vendor) + _executorResources.put(resourceName, req) this } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 876a655b13..eb713a27be 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -18,130 +18,164 @@ package org.apache.spark.resource import java.util.{Map => JMap} -import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} +import java.util.concurrent.atomic.AtomicInteger +import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.spark.SparkConf import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.resource.ResourceUtils.RESOURCE_PREFIX +import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY /** * Resource profile to associate with an RDD. A ResourceProfile allows the user to * specify executor and task requirements for an RDD that will get applied during a * stage. This allows the user to change the resource requirements between stages. - * - * This class is private now for initial development, once we have the feature in place - * this will become public. + * This is meant to be immutable so user can't change it after building. */ @Evolving -private[spark] class ResourceProfile() extends Serializable { +class ResourceProfile( + val executorResources: Map[String, ExecutorResourceRequest], + val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging { - private val _id = ResourceProfile.getNextProfileId - private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]() - private val _executorResources = new mutable.HashMap[String, ExecutorResourceRequest]() + // _id is only a var for testing purposes + private var _id = ResourceProfile.getNextProfileId def id: Int = _id - def taskResources: Map[String, TaskResourceRequest] = _taskResources.toMap - def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.toMap /** * (Java-specific) gets a Java Map of resources to TaskResourceRequest */ - def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asJava + def taskResourcesJMap: JMap[String, TaskResourceRequest] = taskResources.asJava /** * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest */ - def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = _executorResources.asJava - - def reset(): Unit = { - _taskResources.clear() - _executorResources.clear() + def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = { + executorResources.asJava } - def require(requests: ExecutorResourceRequests): this.type = { - _executorResources ++= requests.requests - this + // Note that some cluster managers don't set the executor cores explicitly so + // be sure to check the Option as required + private[spark] def getExecutorCores: Option[Int] = { + executorResources.get(ResourceProfile.CORES).map(_.amount.toInt) } - def require(requests: TaskResourceRequests): this.type = { - _taskResources ++= requests.requests - this + private[spark] def getTaskCpus: Option[Int] = { + taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt) } + // testing only + private[spark] def setToDefaultProfile(): Unit = { + _id = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID + } + + override def equals(obj: Any): Boolean = { + obj match { + case that: ResourceProfile => + that.getClass == this.getClass && that.id == _id && + that.taskResources == taskResources && that.executorResources == executorResources + case _ => + false + } + } + + override def hashCode(): Int = Seq(taskResources, executorResources).hashCode() + override def toString(): String = { - s"Profile: id = ${_id}, executor resources: ${_executorResources}, " + - s"task resources: ${_taskResources}" + s"Profile: id = ${_id}, executor resources: ${executorResources.mkString(",")}, " + + s"task resources: ${taskResources.mkString(",")}" } } -private[spark] object ResourceProfile extends Logging { - val UNKNOWN_RESOURCE_PROFILE_ID = -1 - val DEFAULT_RESOURCE_PROFILE_ID = 0 - +object ResourceProfile extends Logging { + // task resources val CPUS = "cpus" + // Executor resources val CORES = "cores" val MEMORY = "memory" val OVERHEAD_MEM = "memoryOverhead" val PYSPARK_MEM = "pyspark.memory" + // all supported spark executor resources (minus the custom resources like GPUs/FPGAs) + val allSupportedExecutorResources = Seq(CORES, MEMORY, OVERHEAD_MEM, PYSPARK_MEM) + + val UNKNOWN_RESOURCE_PROFILE_ID = -1 + val DEFAULT_RESOURCE_PROFILE_ID = 0 + private lazy val nextProfileId = new AtomicInteger(0) + private val DEFAULT_PROFILE_LOCK = new Object() // The default resource profile uses the application level configs. - // Create the default profile immediately to get ID 0, its initialized later when fetched. - private val defaultProfileRef: AtomicReference[ResourceProfile] = - new AtomicReference[ResourceProfile](new ResourceProfile()) + // var so that it can be reset for testing purposes. + @GuardedBy("DEFAULT_PROFILE_LOCK") + private var defaultProfile: Option[ResourceProfile] = None - assert(defaultProfileRef.get().id == DEFAULT_RESOURCE_PROFILE_ID, - s"Default Profile must have the default profile id: $DEFAULT_RESOURCE_PROFILE_ID") + private[spark] def getNextProfileId: Int = nextProfileId.getAndIncrement() - def getNextProfileId: Int = nextProfileId.getAndIncrement() - - def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile = { - val defaultProf = defaultProfileRef.get() - // check to see if the default profile was initialized yet - if (defaultProf.executorResources == Map.empty) { - synchronized { - val prof = defaultProfileRef.get() - if (prof.executorResources == Map.empty) { - addDefaultTaskResources(prof, conf) - addDefaultExecutorResources(prof, conf) - } - prof + private[spark] def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile = { + DEFAULT_PROFILE_LOCK.synchronized { + defaultProfile match { + case Some(prof) => prof + case None => + val taskResources = getDefaultTaskResources(conf) + val executorResources = getDefaultExecutorResources(conf) + val defProf = new ResourceProfile(executorResources, taskResources) + defProf.setToDefaultProfile + defaultProfile = Some(defProf) + logInfo("Default ResourceProfile created, executor resources: " + + s"${defProf.executorResources}, task resources: " + + s"${defProf.taskResources}") + defProf } - } else { - defaultProf } } - private def addDefaultTaskResources(rprof: ResourceProfile, conf: SparkConf): Unit = { + private def getDefaultTaskResources(conf: SparkConf): Map[String, TaskResourceRequest] = { val cpusPerTask = conf.get(CPUS_PER_TASK) val treqs = new TaskResourceRequests().cpus(cpusPerTask) - val taskReq = ResourceUtils.parseResourceRequirements(conf, SPARK_TASK_PREFIX) - taskReq.foreach { req => - val name = s"${RESOURCE_PREFIX}.${req.resourceName}" - treqs.resource(name, req.amount) - } - rprof.require(treqs) + ResourceUtils.addTaskResourceRequests(conf, treqs) + treqs.requests } - private def addDefaultExecutorResources(rprof: ResourceProfile, conf: SparkConf): Unit = { + private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = { val ereqs = new ExecutorResourceRequests() ereqs.cores(conf.get(EXECUTOR_CORES)) ereqs.memory(conf.get(EXECUTOR_MEMORY).toString) + conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem => ereqs.memoryOverhead(mem.toString)) + conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem => ereqs.pysparkMemory(mem.toString)) val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX) execReq.foreach { req => - val name = s"${RESOURCE_PREFIX}.${req.id.resourceName}" + val name = req.id.resourceName ereqs.resource(name, req.amount, req.discoveryScript.getOrElse(""), req.vendor.getOrElse("")) } - rprof.require(ereqs) + ereqs.requests } - // for testing purposes - def resetDefaultProfile(conf: SparkConf): Unit = getOrCreateDefaultProfile(conf).reset() + // for testing only + private[spark] def reInitDefaultProfile(conf: SparkConf): Unit = { + clearDefaultProfile + // force recreate it after clearing + getOrCreateDefaultProfile(conf) + } + + // for testing only + private[spark] def clearDefaultProfile: Unit = { + DEFAULT_PROFILE_LOCK.synchronized { + defaultProfile = None + } + } + + private[spark] def getCustomTaskResources( + rp: ResourceProfile): Map[String, TaskResourceRequest] = { + rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)) + } + + private[spark] def getCustomExecutorResources( + rp: ResourceProfile): Map[String, ExecutorResourceRequest] = { + rp.executorResources.filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)) + } } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala new file mode 100644 index 0000000000..0d55c176ee --- /dev/null +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.resource + +import java.util.{Map => JMap} +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Evolving + +/** + * Resource profile builder to build a Resource profile to associate with an RDD. + * A ResourceProfile allows the user to specify executor and task requirements for an RDD + * that will get applied during a stage. This allows the user to change the resource + * requirements between stages. + */ +@Evolving +class ResourceProfileBuilder() { + + private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]() + private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]() + + def taskResources: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap + def executorResources: Map[String, ExecutorResourceRequest] = _executorResources.asScala.toMap + + /** + * (Java-specific) gets a Java Map of resources to TaskResourceRequest + */ + def taskResourcesJMap: JMap[String, TaskResourceRequest] = _taskResources.asScala.asJava + + /** + * (Java-specific) gets a Java Map of resources to ExecutorResourceRequest + */ + def executorResourcesJMap: JMap[String, ExecutorResourceRequest] = { + _executorResources.asScala.asJava + } + + def require(requests: ExecutorResourceRequests): this.type = { + _executorResources.putAll(requests.requests.asJava) + this + } + + def require(requests: TaskResourceRequests): this.type = { + _taskResources.putAll(requests.requests.asJava) + this + } + + def clearExecutorResourceRequests(): this.type = { + _executorResources.clear() + this + } + + def clearTaskResourceRequests(): this.type = { + _taskResources.clear() + this + } + + override def toString(): String = { + "Profile executor resources: " + + s"${_executorResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}, " + + s"task resources: ${_taskResources.asScala.map(pair => s"${pair._1}=${pair._2.toString()}")}" + } + + def build: ResourceProfile = { + new ResourceProfile(executorResources, taskResources) + } +} + diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index fdd5c9a84c..190b0cdc88 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -111,7 +111,7 @@ private[spark] object ResourceUtils extends Logging { } def listResourceIds(sparkConf: SparkConf, componentName: String): Seq[ResourceID] = { - sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_DOT").map { case (key, _) => + sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_PREFIX.").map { case (key, _) => key.substring(0, key.indexOf('.')) }.toSet.toSeq.map(name => ResourceID(componentName, name)) } @@ -124,6 +124,35 @@ private[spark] object ResourceUtils extends Logging { .filter(_.amount > 0) } + // Used to take a fraction amount from a task resource requirement and split into a real + // integer amount and the number of parts expected. For instance, if the amount is 0.5, + // the we get (1, 2) back out. + // Returns tuple of (amount, numParts) + def calculateAmountAndPartsForFraction(amount: Double): (Int, Int) = { + val parts = if (amount <= 0.5) { + Math.floor(1.0 / amount).toInt + } else if (amount % 1 != 0) { + throw new SparkException( + s"The resource amount ${amount} must be either <= 0.5, or a whole number.") + } else { + 1 + } + (Math.ceil(amount).toInt, parts) + } + + // Add any task resource requests from the spark conf to the TaskResourceRequests passed in + def addTaskResourceRequests( + sparkConf: SparkConf, + treqs: TaskResourceRequests): Unit = { + listResourceIds(sparkConf, SPARK_TASK_PREFIX).map { resourceId => + val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap + val amountDouble = settings.getOrElse(AMOUNT, + throw new SparkException(s"You must specify an amount for ${resourceId.resourceName}") + ).toDouble + treqs.resource(resourceId.resourceName, amountDouble) + } + } + def parseResourceRequirements(sparkConf: SparkConf, componentName: String) : Seq[ResourceRequirement] = { val resourceIds = listResourceIds(sparkConf, componentName) @@ -136,15 +165,7 @@ private[spark] object ResourceUtils extends Logging { } rnamesAndAmounts.filter { case (_, amount) => amount > 0 }.map { case (rName, amountDouble) => val (amount, parts) = if (componentName.equalsIgnoreCase(SPARK_TASK_PREFIX)) { - val parts = if (amountDouble <= 0.5) { - Math.floor(1.0 / amountDouble).toInt - } else if (amountDouble % 1 != 0) { - throw new SparkException( - s"The resource amount ${amountDouble} must be either <= 0.5, or a whole number.") - } else { - 1 - } - (Math.ceil(amountDouble).toInt, parts) + calculateAmountAndPartsForFraction(amountDouble) } else if (amountDouble % 1 != 0) { throw new SparkException( s"Only tasks support fractional resources, please check your $componentName settings") @@ -181,12 +202,18 @@ private[spark] object ResourceUtils extends Logging { } } + def parseAllocated( + resourcesFileOpt: Option[String], + componentName: String): Seq[ResourceAllocation] = { + resourcesFileOpt.toSeq.flatMap(parseAllocatedFromJsonFile) + .filter(_.id.componentName == componentName) + } + private def parseAllocatedOrDiscoverResources( sparkConf: SparkConf, componentName: String, resourcesFileOpt: Option[String]): Seq[ResourceAllocation] = { - val allocated = resourcesFileOpt.toSeq.flatMap(parseAllocatedFromJsonFile) - .filter(_.id.componentName == componentName) + val allocated = parseAllocated(resourcesFileOpt, componentName) val otherResourceIds = listResourceIds(sparkConf, componentName).diff(allocated.map(_.id)) val otherResources = otherResourceIds.flatMap { id => val request = parseResourceRequest(sparkConf, id) @@ -215,9 +242,24 @@ private[spark] object ResourceUtils extends Logging { requests.foreach(r => assertResourceAllocationMeetsRequest(allocated(r.id), r)) } + private def assertAllResourceAllocationsMatchResourceProfile( + allocations: Map[String, ResourceInformation], + execReqs: Map[String, ExecutorResourceRequest]): Unit = { + execReqs.foreach { case (rName, req) => + require(allocations.contains(rName) && allocations(rName).addresses.size >= req.amount, + s"Resource: ${rName}, with addresses: " + + s"${allocations(rName).addresses.mkString(",")} " + + s"is less than what the user requested: ${req.amount})") + } + } + /** * Gets all allocated resource information for the input component from input resources file and - * discover the remaining via discovery scripts. + * the application level Spark configs. It first looks to see if resource were explicitly + * specified in the resources file (this would include specified address assignments and it only + * specified in certain cluster managers) and then it looks at the Spark configs to get any + * others not specified in the file. The resources not explicitly set in the file require a + * discovery script for it to run to get the addresses of the resource. * It also verifies the resource allocation meets required amount for each resource. * @return a map from resource name to resource info */ @@ -232,6 +274,37 @@ private[spark] object ResourceUtils extends Logging { resourceInfoMap } + /** + * This function is similar to getOrDiscoverallResources, except for it uses the ResourceProfile + * information instead of the application level configs. + * + * It first looks to see if resource were explicitly specified in the resources file + * (this would include specified address assignments and it only specified in certain + * cluster managers) and then it looks at the ResourceProfile to get + * any others not specified in the file. The resources not explicitly set in the file require a + * discovery script for it to run to get the addresses of the resource. + * It also verifies the resource allocation meets required amount for each resource. + * + * @return a map from resource name to resource info + */ + def getOrDiscoverAllResourcesForResourceProfile( + resourcesFileOpt: Option[String], + componentName: String, + resourceProfile: ResourceProfile): Map[String, ResourceInformation] = { + val fileAllocated = parseAllocated(resourcesFileOpt, componentName) + val fileAllocResMap = fileAllocated.map(a => (a.id.resourceName, a.toResourceInformation)).toMap + // only want to look at the ResourceProfile for resources not in the resources file + val execReq = ResourceProfile.getCustomExecutorResources(resourceProfile) + val filteredExecreq = execReq.filterNot { case (rname, _) => fileAllocResMap.contains(rname) } + val rpAllocations = filteredExecreq.map { case (rName, execRequest) => + val addrs = discoverResource(rName, Option(execRequest.discoveryScript)).addresses + (rName, new ResourceInformation(rName, addrs)) + } + val allAllocations = fileAllocResMap ++ rpAllocations + assertAllResourceAllocationsMatchResourceProfile(allAllocations, execReq) + allAllocations + } + def logResourceInfo(componentName: String, resources: Map[String, ResourceInformation]) : Unit = { logInfo("==============================================================") @@ -240,9 +313,9 @@ private[spark] object ResourceUtils extends Logging { } // visible for test - private[spark] def discoverResource(resourceRequest: ResourceRequest): ResourceInformation = { - val resourceName = resourceRequest.id.resourceName - val script = resourceRequest.discoveryScript + private[spark] def discoverResource( + resourceName: String, + script: Option[String]): ResourceInformation = { val result = if (script.nonEmpty) { val scriptFile = new File(script.get) // check that script exists and try to execute @@ -264,10 +337,16 @@ private[spark] object ResourceUtils extends Logging { result } + // visible for test + private[spark] def discoverResource(resourceRequest: ResourceRequest): ResourceInformation = { + val resourceName = resourceRequest.id.resourceName + val script = resourceRequest.discoveryScript + discoverResource(resourceName, script) + } + // known types of resources final val GPU: String = "gpu" final val FPGA: String = "fpga" final val RESOURCE_PREFIX: String = "resource" - final val RESOURCE_DOT: String = s"$RESOURCE_PREFIX." } diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala index 22eda52c42..bffb0a2f52 100644 --- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala +++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala @@ -17,10 +17,6 @@ package org.apache.spark.resource -import scala.collection.mutable - -import org.apache.spark.resource.ResourceUtils.RESOURCE_DOT - /** * A task resource request. This is used in conjuntion with the ResourceProfile to * programmatically specify the resources needed for an RDD that will be applied at the @@ -37,7 +33,19 @@ private[spark] class TaskResourceRequest(val resourceName: String, val amount: D assert(amount <= 0.5 || amount % 1 == 0, s"The resource amount ${amount} must be either <= 0.5, or a whole number.") - if (!resourceName.equals(ResourceProfile.CPUS) && !resourceName.startsWith(RESOURCE_DOT)) { - throw new IllegalArgumentException(s"Task resource not allowed: $resourceName") + override def equals(obj: Any): Boolean = { + obj match { + case that: TaskResourceRequest => + that.getClass == this.getClass && + that.resourceName == resourceName && that.amount == amount + case _ => + false + } + } + + override def hashCode(): Int = Seq(resourceName, amount).hashCode() + + override def toString(): String = { + s"name: $resourceName, amount: $amount" } } diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala index 21cbc5d805..9624b51dd1 100644 --- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala +++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala @@ -17,10 +17,11 @@ package org.apache.spark.resource -import scala.collection.mutable +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ import org.apache.spark.resource.ResourceProfile._ -import org.apache.spark.resource.ResourceUtils._ /** * A set of task resource requests. This is used in conjuntion with the ResourceProfile to @@ -32,9 +33,9 @@ import org.apache.spark.resource.ResourceUtils._ */ private[spark] class TaskResourceRequests() extends Serializable { - private val _taskResources = new mutable.HashMap[String, TaskResourceRequest]() + private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]() - def requests: Map[String, TaskResourceRequest] = _taskResources.toMap + def requests: Map[String, TaskResourceRequest] = _taskResources.asScala.toMap /** * Specify number of cpus per Task. @@ -42,15 +43,13 @@ private[spark] class TaskResourceRequests() extends Serializable { * @param amount Number of cpus to allocate per Task. */ def cpus(amount: Int): this.type = { - val t = new TaskResourceRequest(CPUS, amount) - _taskResources(CPUS) = t + val treq = new TaskResourceRequest(CPUS, amount) + _taskResources.put(CPUS, treq) this } /** - * Amount of a particular custom resource(GPU, FPGA, etc) to use. The resource names supported - * correspond to the regular Spark configs with the prefix removed. For instance, resources - * like GPUs are resource.gpu (spark configs spark.task.resource.gpu.*) + * Amount of a particular custom resource(GPU, FPGA, etc) to use. * * @param resourceName Name of the resource. * @param amount Amount requesting as a Double to support fractional resource requests. @@ -58,14 +57,14 @@ private[spark] class TaskResourceRequests() extends Serializable { * lets you configure X number of tasks to run on a single resource, * ie amount equals 0.5 translates into 2 tasks per resource address. */ - def resource(rName: String, amount: Double): this.type = { - val t = new TaskResourceRequest(rName, amount) - _taskResources(rName) = t + def resource(resourceName: String, amount: Double): this.type = { + val treq = new TaskResourceRequest(resourceName, amount) + _taskResources.put(resourceName, treq) this } def addRequest(treq: TaskResourceRequest): this.type = { - _taskResources(treq.resourceName) = treq + _taskResources.put(treq.resourceName, treq) this } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 57317e7f6a..283390814a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState -import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.{ResourceInformation, ResourceProfile} import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.ExecutorLossReason import org.apache.spark.util.SerializableBuffer @@ -29,12 +29,13 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable private[spark] object CoarseGrainedClusterMessages { - case object RetrieveSparkAppConfig extends CoarseGrainedClusterMessage + case class RetrieveSparkAppConfig(resourceProfileId: Int) extends CoarseGrainedClusterMessage case class SparkAppConfig( sparkProperties: Seq[(String, String)], ioEncryptionKey: Option[Array[Byte]], - hadoopDelegationCreds: Option[Array[Byte]]) + hadoopDelegationCreds: Option[Array[Byte]], + resourceProfile: ResourceProfile) extends CoarseGrainedClusterMessage case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage @@ -59,7 +60,8 @@ private[spark] object CoarseGrainedClusterMessages { cores: Int, logUrls: Map[String, String], attributes: Map[String, String], - resources: Map[String, ResourceInformation]) + resources: Map[String, ResourceInformation], + resourceProfileId: Int) extends CoarseGrainedClusterMessage case class LaunchedExecutor(executorId: String) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 031b9afaa1..55f4005ef1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -33,7 +33,7 @@ import org.apache.spark.executor.ExecutorLogUrlHandler import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network._ -import org.apache.spark.resource.ResourceRequirement +import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -205,7 +205,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, - attributes, resources) => + attributes, resources, resourceProfileId) => if (executorDataMap.contains(executorId)) { context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId")) } else if (scheduler.nodeBlacklist.contains(hostname) || @@ -236,7 +236,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } val data = new ExecutorData(executorRef, executorAddress, hostname, 0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes, - resourcesInfo) + resourcesInfo, resourceProfileId) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { @@ -270,11 +270,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeWorker(workerId, host, message) context.reply(true) - case RetrieveSparkAppConfig => + case RetrieveSparkAppConfig(resourceProfileId) => + // note this will be updated in later prs to get the ResourceProfile from a + // ResourceProfileManager that matches the resource profile id + // for now just use default profile + val rp = ResourceProfile.getOrCreateDefaultProfile(conf) val reply = SparkAppConfig( sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey(), - Option(delegationTokens.get())) + Option(delegationTokens.get()), + rp) context.reply(reply) } @@ -570,6 +575,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.get(executorId).map(_.resourcesInfo).getOrElse(Map.empty) } + // this function is for testing only + def getExecutorResourceProfileId(executorId: String): Int = synchronized { + val res = executorDataMap.get(executorId) + res.map(_.resourceProfileId).getOrElse(ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID) + } + /** * Request an additional number of executors from the cluster manager. * @return whether the request is acknowledged. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index 17907d88e5..062146174f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -29,6 +29,7 @@ import org.apache.spark.scheduler.ExecutorResourceInfo * @param freeCores The current number of cores available for work on the executor * @param totalCores The total number of cores available to the executor * @param resourcesInfo The information of the currently available resources on the executor + * @param resourceProfileId The id of the ResourceProfile being used by this executor */ private[cluster] class ExecutorData( val executorEndpoint: RpcEndpointRef, @@ -38,5 +39,7 @@ private[cluster] class ExecutorData( override val totalCores: Int, override val logUrlMap: Map[String, String], override val attributes: Map[String, String], - override val resourcesInfo: Map[String, ExecutorResourceInfo] -) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes, resourcesInfo) + override val resourcesInfo: Map[String, ExecutorResourceInfo], + override val resourceProfileId: Int +) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes, + resourcesInfo, resourceProfileId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala index 5a4ad6e00e..a97b08941b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster import org.apache.spark.annotation.DeveloperApi import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID /** * :: DeveloperApi :: @@ -25,14 +26,15 @@ import org.apache.spark.resource.ResourceInformation */ @DeveloperApi class ExecutorInfo( - val executorHost: String, - val totalCores: Int, - val logUrlMap: Map[String, String], - val attributes: Map[String, String], - val resourcesInfo: Map[String, ResourceInformation]) { + val executorHost: String, + val totalCores: Int, + val logUrlMap: Map[String, String], + val attributes: Map[String, String], + val resourcesInfo: Map[String, ResourceInformation], + val resourceProfileId: Int) { def this(executorHost: String, totalCores: Int, logUrlMap: Map[String, String]) = { - this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty) + this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID) } def this( @@ -40,7 +42,17 @@ class ExecutorInfo( totalCores: Int, logUrlMap: Map[String, String], attributes: Map[String, String]) = { - this(executorHost, totalCores, logUrlMap, attributes, Map.empty) + this(executorHost, totalCores, logUrlMap, attributes, Map.empty, DEFAULT_RESOURCE_PROFILE_ID) + } + + def this( + executorHost: String, + totalCores: Int, + logUrlMap: Map[String, String], + attributes: Map[String, String], + resourcesInfo: Map[String, ResourceInformation]) = { + this(executorHost, totalCores, logUrlMap, attributes, resourcesInfo, + DEFAULT_RESOURCE_PROFILE_ID) } def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo] @@ -52,12 +64,14 @@ class ExecutorInfo( totalCores == that.totalCores && logUrlMap == that.logUrlMap && attributes == that.attributes && - resourcesInfo == that.resourcesInfo + resourcesInfo == that.resourcesInfo && + resourceProfileId == that.resourceProfileId case _ => false } override def hashCode(): Int = { - val state = Seq(executorHost, totalCores, logUrlMap, attributes, resourcesInfo) + val state = Seq(executorHost, totalCores, logUrlMap, attributes, resourcesInfo, + resourceProfileId) state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 3dfd1eac8c..a24f1902fa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -26,6 +26,7 @@ import scala.collection.mutable import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.scheduler._ import org.apache.spark.storage.RDDBlockId import org.apache.spark.util.Clock @@ -52,6 +53,7 @@ private[spark] class ExecutorMonitor( conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING) private val executors = new ConcurrentHashMap[String, Tracker]() + private val execResourceProfileCount = new ConcurrentHashMap[Int, Int]() // The following fields are an optimization to avoid having to scan all executors on every EAM // schedule interval to find out which ones are timed out. They keep track of when the next @@ -92,6 +94,7 @@ private[spark] class ExecutorMonitor( def reset(): Unit = { executors.clear() + execResourceProfileCount.clear() nextTimeout.set(Long.MaxValue) timedOutExecs = Nil } @@ -148,8 +151,25 @@ private[spark] class ExecutorMonitor( def executorCount: Int = executors.size() + def executorCountWithResourceProfile(id: Int): Int = { + execResourceProfileCount.getOrDefault(id, 0) + } + + def getResourceProfileId(executorId: String): Int = { + val execTrackingInfo = executors.get(executorId) + if (execTrackingInfo != null) { + execTrackingInfo.resourceProfileId + } else { + UNKNOWN_RESOURCE_PROFILE_ID + } + } + def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => exec.pendingRemoval } + def pendingRemovalCountPerResourceProfileId(id: Int): Int = { + executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size + } + override def onJobStart(event: SparkListenerJobStart): Unit = { if (!shuffleTrackingEnabled) { return @@ -261,7 +281,7 @@ private[spark] class ExecutorMonitor( val executorId = event.taskInfo.executorId // Guard against a late arriving task start event (SPARK-26927). if (client.isExecutorActive(executorId)) { - val exec = ensureExecutorIsTracked(executorId) + val exec = ensureExecutorIsTracked(executorId, UNKNOWN_RESOURCE_PROFILE_ID) exec.updateRunningTasks(1) } } @@ -290,15 +310,21 @@ private[spark] class ExecutorMonitor( } override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { - val exec = ensureExecutorIsTracked(event.executorId) + val exec = ensureExecutorIsTracked(event.executorId, event.executorInfo.resourceProfileId) exec.updateRunningTasks(0) logInfo(s"New executor ${event.executorId} has registered (new total is ${executors.size()})") } + private def decrementExecResourceProfileCount(rpId: Int): Unit = { + val count = execResourceProfileCount.getOrDefault(rpId, 0) + execResourceProfileCount.replace(rpId, count, count - 1) + execResourceProfileCount.remove(rpId, 0) + } + override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { val removed = executors.remove(event.executorId) if (removed != null) { - logInfo(s"Executor ${event.executorId} removed (new total is ${executors.size()})") + decrementExecResourceProfileCount(removed.resourceProfileId) if (!removed.pendingRemoval) { nextTimeout.set(Long.MinValue) } @@ -309,8 +335,8 @@ private[spark] class ExecutorMonitor( if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { return } - - val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId) + val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, + UNKNOWN_RESOURCE_PROFILE_ID) val storageLevel = event.blockUpdatedInfo.storageLevel val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId] @@ -392,8 +418,26 @@ private[spark] class ExecutorMonitor( * which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded` * event, which is possible because these events are posted in different threads. (see SPARK-4951) */ - private def ensureExecutorIsTracked(id: String): Tracker = { - executors.computeIfAbsent(id, _ => new Tracker()) + private def ensureExecutorIsTracked(id: String, resourceProfileId: Int): Tracker = { + val numExecsWithRpId = execResourceProfileCount.computeIfAbsent(resourceProfileId, _ => 0) + val execTracker = executors.computeIfAbsent(id, _ => { + val newcount = numExecsWithRpId + 1 + execResourceProfileCount.put(resourceProfileId, newcount) + logDebug(s"Executor added with ResourceProfile id: $resourceProfileId " + + s"count is now $newcount") + new Tracker(resourceProfileId) + }) + // if we had added executor before without knowing the resource profile id, fix it up + if (execTracker.resourceProfileId == UNKNOWN_RESOURCE_PROFILE_ID && + resourceProfileId != UNKNOWN_RESOURCE_PROFILE_ID) { + logDebug(s"Executor: $id, resource profile id was unknown, setting " + + s"it to $resourceProfileId") + execTracker.resourceProfileId = resourceProfileId + // fix up the counts for each resource profile id + execResourceProfileCount.put(resourceProfileId, numExecsWithRpId + 1) + decrementExecResourceProfileCount(UNKNOWN_RESOURCE_PROFILE_ID) + } + execTracker } private def updateNextTimeout(newValue: Long): Unit = { @@ -413,7 +457,7 @@ private[spark] class ExecutorMonitor( } } - private class Tracker { + private class Tracker(var resourceProfileId: Int) { @volatile var timeoutAt: Long = Long.MaxValue // Tracks whether this executor is thought to be timed out. It's used to detect when the list diff --git a/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java b/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java index 077120724c..bb413c00fb 100644 --- a/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java +++ b/core/src/test/java/org/apache/spark/resource/JavaResourceProfileSuite.java @@ -35,7 +35,7 @@ public class JavaResourceProfileSuite { ExecutorResourceRequests execReqFpga = new ExecutorResourceRequests().resource(FPGAResource, 3, "myfpgascript", "nvidia"); - ResourceProfile rprof = new ResourceProfile(); + ResourceProfileBuilder rprof = new ResourceProfileBuilder(); rprof.require(execReqGpu); rprof.require(execReqFpga); TaskResourceRequests taskReq1 = new TaskResourceRequests().resource(GpuResource, 1); diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 6ae1f197cf..99f3e3b2e4 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.{Clock, ManualClock, SystemClock} @@ -1018,8 +1019,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { manager } + private val execInfo = new ExecutorInfo("host1", 1, Map.empty, + Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID) + private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): Unit = { - post(SparkListenerExecutorAdded(0L, id, null)) + post(SparkListenerExecutorAdded(0L, id, execInfo)) } private def onExecutorRemoved(manager: ExecutorAllocationManager, id: String): Unit = { diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index b468e6fa50..ff0f2f9134 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -30,6 +30,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING +import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -177,10 +178,10 @@ class HeartbeatReceiverSuite val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2) fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty, Map.empty, - Map.empty)) + Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty, Map.empty, - Map.empty)) + Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) addExecutorAndVerify(executorId2) diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala index d050ee2c45..1fe12e116d 100644 --- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala @@ -22,6 +22,8 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterEach import org.scalatest.Suite +import org.apache.spark.resource.ResourceProfile + /** Manages a local `sc` `SparkContext` variable, correctly stopping it after each test. */ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite => @@ -42,6 +44,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self def resetSparkContext(): Unit = { LocalSparkContext.stop(sc) + ResourceProfile.clearDefaultProfile sc = null } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index e316da7384..f8b99302c4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.deploy.master.ApplicationInfo import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker import org.apache.spark.internal.config +import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster._ @@ -505,7 +506,7 @@ class StandaloneDynamicAllocationSuite val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) val message = RegisterExecutor("one", endpointRef, "blacklisted-host", 10, Map.empty, - Map.empty, Map.empty) + Map.empty, Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) val taskScheduler = mock(classOf[TaskSchedulerImpl]) when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host")) @@ -629,7 +630,7 @@ class StandaloneDynamicAllocationSuite val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty, - Map.empty) + Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) backend.driverEndpoint.askSync[Boolean](message) backend.driverEndpoint.send(LaunchedExecutor(id)) } diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 7e96039ca9..a996fc4a0b 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.executor +import java.io.File import java.net.URL import java.nio.ByteBuffer import java.util.Properties @@ -33,7 +34,7 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.TestUtils._ -import org.apache.spark.resource.{ResourceAllocation, ResourceInformation} +import org.apache.spark.resource._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.rpc.RpcEnv @@ -49,13 +50,13 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite test("parsing no resources") { val conf = new SparkConf - conf.set(TASK_GPU_ID.amountConf, "2") + val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf) val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None) + 4, Seq.empty[URL], env, None, resourceProfile) withTempDir { tmpDir => val testResourceArgs: JObject = ("" -> "") val ja = JArray(List(testResourceArgs)) @@ -72,12 +73,11 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite test("parsing one resource") { val conf = new SparkConf conf.set(EXECUTOR_GPU_ID.amountConf, "2") - conf.set(TASK_GPU_ID.amountConf, "2") val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None) + 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) withTempDir { tmpDir => val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) val ja = Extraction.decompose(Seq(ra)) @@ -91,18 +91,27 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite } } + test("parsing multiple resources resource profile") { + val rpBuilder = new ResourceProfileBuilder + val ereqs = new ExecutorResourceRequests().resource(GPU, 2) + ereqs.resource(FPGA, 3) + val rp = rpBuilder.require(ereqs).build + testParsingMultipleResources(new SparkConf, rp) + } + test("parsing multiple resources") { val conf = new SparkConf conf.set(EXECUTOR_GPU_ID.amountConf, "2") - conf.set(TASK_GPU_ID.amountConf, "2") conf.set(EXECUTOR_FPGA_ID.amountConf, "3") - conf.set(TASK_FPGA_ID.amountConf, "3") + testParsingMultipleResources(conf, ResourceProfile.getOrCreateDefaultProfile(conf)) + } + def testParsingMultipleResources(conf: SparkConf, resourceProfile: ResourceProfile) { val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None) + 4, Seq.empty[URL], env, None, resourceProfile) withTempDir { tmpDir => val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) @@ -125,12 +134,11 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite test("error checking parsing resources and executor and task configs") { val conf = new SparkConf conf.set(EXECUTOR_GPU_ID.amountConf, "2") - conf.set(TASK_GPU_ID.amountConf, "2") val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None) + 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) // not enough gpu's on the executor withTempDir { tmpDir => @@ -156,20 +164,33 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val parsedResources = backend.parseOrFindResources(Some(f1)) }.getMessage() - assert(error.contains("User is expecting to use resource: gpu, but didn't specify a " + - "discovery script!")) + assert(error.contains("Resource script: to discover gpu doesn't exist!")) } } + test("executor resource found less than required resource profile") { + val rpBuilder = new ResourceProfileBuilder + val ereqs = new ExecutorResourceRequests().resource(GPU, 4) + val treqs = new TaskResourceRequests().resource(GPU, 1) + val rp = rpBuilder.require(ereqs).require(treqs).build + testExecutorResourceFoundLessThanRequired(new SparkConf, rp) + } + test("executor resource found less than required") { - val conf = new SparkConf + val conf = new SparkConf() conf.set(EXECUTOR_GPU_ID.amountConf, "4") conf.set(TASK_GPU_ID.amountConf, "1") + testExecutorResourceFoundLessThanRequired(conf, ResourceProfile.getOrCreateDefaultProfile(conf)) + } + + private def testExecutorResourceFoundLessThanRequired( + conf: SparkConf, + resourceProfile: ResourceProfile) = { val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None) + 4, Seq.empty[URL], env, None, resourceProfile) // executor resources < required withTempDir { tmpDir => @@ -189,7 +210,6 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite test("use resource discovery") { val conf = new SparkConf conf.set(EXECUTOR_FPGA_ID.amountConf, "3") - conf.set(TASK_FPGA_ID.amountConf, "3") assume(!(Utils.isWindows)) withTempDir { dir => val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", @@ -201,7 +221,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None) + 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) val parsedResources = backend.parseOrFindResources(None) @@ -212,37 +232,56 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite } } - test("use resource discovery and allocated file option") { - val conf = new SparkConf - conf.set(EXECUTOR_FPGA_ID.amountConf, "3") - conf.set(TASK_FPGA_ID.amountConf, "3") + test("use resource discovery and allocated file option with resource profile") { assume(!(Utils.isWindows)) withTempDir { dir => val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", """{"name": "fpga","addresses":["f1", "f2", "f3"]}""") - conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, scriptPath) - - val serializer = new JavaSerializer(conf) - val env = createMockEnv(conf, serializer) - - // we don't really use this, just need it to get at the parser function - val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, Seq.empty[URL], env, None) - val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) - val ja = Extraction.decompose(Seq(gpuArgs)) - val f1 = createTempJsonFile(dir, "resources", ja) - val parsedResources = backend.parseOrFindResources(Some(f1)) - - assert(parsedResources.size === 2) - assert(parsedResources.get(GPU).nonEmpty) - assert(parsedResources.get(GPU).get.name === GPU) - assert(parsedResources.get(GPU).get.addresses.sameElements(Array("0", "1"))) - assert(parsedResources.get(FPGA).nonEmpty) - assert(parsedResources.get(FPGA).get.name === FPGA) - assert(parsedResources.get(FPGA).get.addresses.sameElements(Array("f1", "f2", "f3"))) + val rpBuilder = new ResourceProfileBuilder + val ereqs = new ExecutorResourceRequests().resource(FPGA, 3, scriptPath) + ereqs.resource(GPU, 2) + val rp = rpBuilder.require(ereqs).build + allocatedFileAndConfigsResourceDiscoveryTestFpga(dir, new SparkConf, rp) } } + test("use resource discovery and allocated file option") { + assume(!(Utils.isWindows)) + withTempDir { dir => + val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", + """{"name": "fpga","addresses":["f1", "f2", "f3"]}""") + val conf = new SparkConf + conf.set(EXECUTOR_FPGA_ID.amountConf, "3") + conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, scriptPath) + conf.set(EXECUTOR_GPU_ID.amountConf, "2") + val rp = ResourceProfile.getOrCreateDefaultProfile(conf) + allocatedFileAndConfigsResourceDiscoveryTestFpga(dir, conf, rp) + } + } + + private def allocatedFileAndConfigsResourceDiscoveryTestFpga( + dir: File, + conf: SparkConf, + resourceProfile: ResourceProfile) = { + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + + // we don't really use this, just need it to get at the parser function + val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", + 4, Seq.empty[URL], env, None, resourceProfile) + val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) + val ja = Extraction.decompose(Seq(gpuArgs)) + val f1 = createTempJsonFile(dir, "resources", ja) + val parsedResources = backend.parseOrFindResources(Some(f1)) + + assert(parsedResources.size === 2) + assert(parsedResources.get(GPU).nonEmpty) + assert(parsedResources.get(GPU).get.name === GPU) + assert(parsedResources.get(GPU).get.addresses.sameElements(Array("0", "1"))) + assert(parsedResources.get(FPGA).nonEmpty) + assert(parsedResources.get(FPGA).get.name === FPGA) + assert(parsedResources.get(FPGA).get.addresses.sameElements(Array("f1", "f2", "f3"))) + } test("track allocated resources by taskId") { val conf = new SparkConf @@ -253,15 +292,16 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite try { val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr) val env = createMockEnv(conf, serializer, Some(rpcEnv)) - backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", - "host1", "host1", 4, Seq.empty[URL], env, None) + backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", + "host1", "host1", 4, Seq.empty[URL], env, None, + resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)) assert(backend.taskResources.isEmpty) val taskId = 1000000 // We don't really verify the data, just pass it around. val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) - val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19, 1, - mutable.Map.empty, mutable.Map.empty, new Properties, + val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", + 19, 1, mutable.Map.empty, mutable.Map.empty, new Properties, Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data) val serializedTaskDescription = TaskDescription.encode(taskDescription) backend.executor = mock[Executor] @@ -271,13 +311,15 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription))) eventually(timeout(10.seconds)) { assert(backend.taskResources.size == 1) - assert(backend.taskResources(taskId)(GPU).addresses sameElements Array("0", "1")) + val resources = backend.taskResources(taskId) + assert(resources(GPU).addresses sameElements Array("0", "1")) } // Update the status of a running task shall not affect `taskResources` map. backend.statusUpdate(taskId, TaskState.RUNNING, data) assert(backend.taskResources.size == 1) - assert(backend.taskResources(taskId)(GPU).addresses sameElements Array("0", "1")) + val resources = backend.taskResources(taskId) + assert(resources(GPU).addresses sameElements Array("0", "1")) // Update the status of a finished task shall remove the entry from `taskResources` map. backend.statusUpdate(taskId, TaskState.FINISHED, data) diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala index a087f18b3f..c0637eeeac 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala @@ -18,72 +18,97 @@ package org.apache.spark.resource import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.{EXECUTOR_CORES, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, SPARK_EXECUTOR_PREFIX} +import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY class ResourceProfileSuite extends SparkFunSuite { override def afterEach() { try { - ResourceProfile.resetDefaultProfile(new SparkConf) + ResourceProfile.clearDefaultProfile } finally { super.afterEach() } } - test("Default ResourceProfile") { val rprof = ResourceProfile.getOrCreateDefaultProfile(new SparkConf) assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) assert(rprof.executorResources.size === 2, "Executor resources should contain cores and memory by default") assert(rprof.executorResources(ResourceProfile.CORES).amount === 1, - s"Executor resources should have 1 core") + "Executor resources should have 1 core") + assert(rprof.getExecutorCores.get === 1, + "Executor resources should have 1 core") assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 1024, - s"Executor resources should have 1024 memory") + "Executor resources should have 1024 memory") + assert(rprof.executorResources.get(ResourceProfile.PYSPARK_MEM) == None, + "pyspark memory empty if not specified") + assert(rprof.executorResources.get(ResourceProfile.OVERHEAD_MEM) == None, + "overhead memory empty if not specified") assert(rprof.taskResources.size === 1, "Task resources should just contain cpus by default") assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1, - s"Task resources should have 1 cpu") + "Task resources should have 1 cpu") + assert(rprof.getTaskCpus.get === 1, + "Task resources should have 1 cpu") } test("Default ResourceProfile with app level resources specified") { val conf = new SparkConf + conf.set(PYSPARK_EXECUTOR_MEMORY.key, "2g") + conf.set(EXECUTOR_MEMORY_OVERHEAD.key, "1g") + conf.set(EXECUTOR_MEMORY.key, "4g") + conf.set(EXECUTOR_CORES.key, "4") conf.set("spark.task.resource.gpu.amount", "1") conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.amount", "1") conf.set(s"$SPARK_EXECUTOR_PREFIX.resource.gpu.discoveryScript", "nameOfScript") val rprof = ResourceProfile.getOrCreateDefaultProfile(conf) assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) val execResources = rprof.executorResources - assert(execResources.size === 3, + assert(execResources.size === 5, "Executor resources should contain cores, memory, and gpu " + execResources) + assert(execResources.contains("gpu"), "Executor resources should have gpu") + assert(rprof.executorResources(ResourceProfile.CORES).amount === 4, + "Executor resources should have 4 core") + assert(rprof.getExecutorCores.get === 4, "Executor resources should have 4 core") + assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096, + "Executor resources should have 1024 memory") + assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount == 2048, + "pyspark memory empty if not specified") + assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount == 1024, + "overhead memory empty if not specified") assert(rprof.taskResources.size === 2, "Task resources should just contain cpus and gpu") - assert(execResources.contains("resource.gpu"), "Executor resources should have gpu") - assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu") + assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu") + } + + test("test default profile task gpus fractional") { + val sparkConf = new SparkConf() + .set("spark.executor.resource.gpu.amount", "2") + .set("spark.task.resource.gpu.amount", "0.33") + val immrprof = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + assert(immrprof.taskResources.get("gpu").get.amount == 0.33) } test("Create ResourceProfile") { - val rprof = new ResourceProfile() - assert(rprof.id > ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) - assert(rprof.executorResources === Map.empty) - assert(rprof.taskResources === Map.empty) - - val taskReq = new TaskResourceRequests().resource("resource.gpu", 1) - val eReq = new ExecutorResourceRequests().resource("resource.gpu", 2, "myscript", "nvidia") + val rprof = new ResourceProfileBuilder() + val taskReq = new TaskResourceRequests().resource("gpu", 1) + val eReq = new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia") rprof.require(taskReq).require(eReq) assert(rprof.executorResources.size === 1) - assert(rprof.executorResources.contains("resource.gpu"), + assert(rprof.executorResources.contains("gpu"), "Executor resources should have gpu") - assert(rprof.executorResources.get("resource.gpu").get.vendor === "nvidia", + assert(rprof.executorResources.get("gpu").get.vendor === "nvidia", "gpu vendor should be nvidia") - assert(rprof.executorResources.get("resource.gpu").get.discoveryScript === "myscript", + assert(rprof.executorResources.get("gpu").get.discoveryScript === "myscript", "discoveryScript should be myscript") - assert(rprof.executorResources.get("resource.gpu").get.amount === 2, + assert(rprof.executorResources.get("gpu").get.amount === 2, "gpu amount should be 2") assert(rprof.taskResources.size === 1, "Should have 1 task resource") - assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu") - assert(rprof.taskResources.get("resource.gpu").get.amount === 1, + assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu") + assert(rprof.taskResources.get("gpu").get.amount === 1, "Task resources should have 1 gpu") val ereqs = new ExecutorResourceRequests() @@ -97,70 +122,59 @@ class ResourceProfileSuite extends SparkFunSuite { assert(rprof.executorResources.size === 5) assert(rprof.executorResources(ResourceProfile.CORES).amount === 2, - s"Executor resources should have 2 cores") + "Executor resources should have 2 cores") assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096, - s"Executor resources should have 4096 memory") + "Executor resources should have 4096 memory") assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2048, - s"Executor resources should have 2048 overhead memory") + "Executor resources should have 2048 overhead memory") assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 1024, - s"Executor resources should have 1024 pyspark memory") + "Executor resources should have 1024 pyspark memory") assert(rprof.taskResources.size === 2) assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu") - - val error = intercept[IllegalArgumentException] { - rprof.require(new ExecutorResourceRequests().resource("bogusResource", 1)) - }.getMessage() - assert(error.contains("Executor resource not allowed")) - - val taskError = intercept[IllegalArgumentException] { - rprof.require(new TaskResourceRequests().resource("bogusTaskResource", 1)) - }.getMessage() - assert(taskError.contains("Task resource not allowed")) } test("Test ExecutorResourceRequests memory helpers") { - val rprof = new ResourceProfile() + val rprof = new ResourceProfileBuilder() val ereqs = new ExecutorResourceRequests() ereqs.memory("4g") ereqs.memoryOverhead("2000m").pysparkMemory("512000k") rprof.require(ereqs) assert(rprof.executorResources(ResourceProfile.MEMORY).amount === 4096, - s"Executor resources should have 4096 memory") + "Executor resources should have 4096 memory") assert(rprof.executorResources(ResourceProfile.OVERHEAD_MEM).amount === 2000, - s"Executor resources should have 2000 overhead memory") + "Executor resources should have 2000 overhead memory") assert(rprof.executorResources(ResourceProfile.PYSPARK_MEM).amount === 500, - s"Executor resources should have 512 pyspark memory") + "Executor resources should have 512 pyspark memory") } test("Test TaskResourceRequest fractional") { - val rprof = new ResourceProfile() - val treqs = new TaskResourceRequests().resource("resource.gpu", 0.33) + val rprof = new ResourceProfileBuilder() + val treqs = new TaskResourceRequests().resource("gpu", 0.33) rprof.require(treqs) assert(rprof.taskResources.size === 1, "Should have 1 task resource") - assert(rprof.taskResources.contains("resource.gpu"), "Task resources should have gpu") - assert(rprof.taskResources.get("resource.gpu").get.amount === 0.33, + assert(rprof.taskResources.contains("gpu"), "Task resources should have gpu") + assert(rprof.taskResources.get("gpu").get.amount === 0.33, "Task resources should have 0.33 gpu") - val fpgaReqs = new TaskResourceRequests().resource("resource.fpga", 4.0) + val fpgaReqs = new TaskResourceRequests().resource("fpga", 4.0) rprof.require(fpgaReqs) assert(rprof.taskResources.size === 2, "Should have 2 task resource") - assert(rprof.taskResources.contains("resource.fpga"), "Task resources should have gpu") - assert(rprof.taskResources.get("resource.fpga").get.amount === 4.0, + assert(rprof.taskResources.contains("fpga"), "Task resources should have gpu") + assert(rprof.taskResources.get("fpga").get.amount === 4.0, "Task resources should have 4.0 gpu") var taskError = intercept[AssertionError] { - rprof.require(new TaskResourceRequests().resource("resource.gpu", 1.5)) + rprof.require(new TaskResourceRequests().resource("gpu", 1.5)) }.getMessage() assert(taskError.contains("The resource amount 1.5 must be either <= 0.5, or a whole number.")) taskError = intercept[AssertionError] { - rprof.require(new TaskResourceRequests().resource("resource.gpu", 0.7)) + rprof.require(new TaskResourceRequests().resource("gpu", 0.7)) }.getMessage() assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number.")) } } - diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala index b6d46d20d7..b809469fd7 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala @@ -128,7 +128,8 @@ class ResourceUtilsSuite extends SparkFunSuite assert(resourcesFromFileOnly(FPGA) === expectedFpgaInfo) val gpuDiscovery = createTempScriptWithExpectedOutput( - dir, "gpuDiscoveryScript", """{"name": "gpu", "addresses": ["0", "1"]}""") + dir, "gpuDiscoveryScript", + """{"name": "gpu", "addresses": ["0", "1"]}""") conf.set(EXECUTOR_GPU_ID.amountConf, "2") conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, gpuDiscovery) val resourcesFromBoth = getOrDiscoverAllResources( @@ -139,6 +140,38 @@ class ResourceUtilsSuite extends SparkFunSuite } } + test("get from resources file and discover resource profile remaining") { + val conf = new SparkConf + val rpId = 1 + assume(!(Utils.isWindows)) + withTempDir { dir => + implicit val formats = DefaultFormats + val fpgaAddrs = Seq("f1", "f2", "f3") + val fpgaAllocation = ResourceAllocation(EXECUTOR_FPGA_ID, fpgaAddrs) + val resourcesFile = createTempJsonFile( + dir, "resources", Extraction.decompose(Seq(fpgaAllocation))) + val resourcesFromFileOnly = getOrDiscoverAllResourcesForResourceProfile( + Some(resourcesFile), + SPARK_EXECUTOR_PREFIX, + ResourceProfile.getOrCreateDefaultProfile(conf)) + val expectedFpgaInfo = new ResourceInformation(FPGA, fpgaAddrs.toArray) + assert(resourcesFromFileOnly(FPGA) === expectedFpgaInfo) + + val gpuDiscovery = createTempScriptWithExpectedOutput( + dir, "gpuDiscoveryScript", + """{"name": "gpu", "addresses": ["0", "1"]}""") + val rpBuilder = new ResourceProfileBuilder() + val ereqs = new ExecutorResourceRequests().resource(GPU, 2, gpuDiscovery) + val treqs = new TaskResourceRequests().resource(GPU, 1) + val rp = rpBuilder.require(ereqs).require(treqs).build + val resourcesFromBoth = getOrDiscoverAllResourcesForResourceProfile( + Some(resourcesFile), SPARK_EXECUTOR_PREFIX, rp) + val expectedGpuInfo = new ResourceInformation(GPU, Array("0", "1")) + assert(resourcesFromBoth(FPGA) === expectedFpgaInfo) + assert(resourcesFromBoth(GPU) === expectedGpuInfo) + } + } + test("list resource ids") { val conf = new SparkConf conf.set(DRIVER_GPU_ID.amountConf, "2") @@ -148,7 +181,7 @@ class ResourceUtilsSuite extends SparkFunSuite conf.set(DRIVER_FPGA_ID.amountConf, "2") val resourcesMap = listResourceIds(conf, SPARK_DRIVER_PREFIX) - .map{ rId => (rId.resourceName, 1)}.toMap + .map { rId => (rId.resourceName, 1) }.toMap assert(resourcesMap.size === 2, "should only have GPU for resource") assert(resourcesMap.get(GPU).nonEmpty, "should have GPU") assert(resourcesMap.get(FPGA).nonEmpty, "should have FPGA") diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 29160a3e0f..c063301673 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE import org.apache.spark.rdd.RDD -import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.{ResourceInformation, ResourceProfile} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} @@ -173,11 +173,14 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo sc.addSparkListener(listener) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty)) + RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, + Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty)) + RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, + Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, Map.empty)) + RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes, + Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis) assert(executorAddedCount === 3) @@ -214,20 +217,25 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo sc.addSparkListener(listener) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources)) + RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources)) + RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) backend.driverEndpoint.askSync[Boolean]( - RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources)) + RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty, resources, + 5)) val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf) val bytebuffer = java.nio.ByteBuffer.allocate(frameSize - 100) val buffer = new SerializableBuffer(bytebuffer) var execResources = backend.getExecutorAvailableResources("1") - assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3")) + var exec3ResourceProfileId = backend.getExecutorResourceProfileId("3") + assert(exec3ResourceProfileId === 5) + val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0"))) var taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1", "t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index 1397cb7b39..615389ae5c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -27,7 +27,9 @@ import org.mockito.Mockito.{doAnswer, mock, when} import org.apache.spark._ import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, UNKNOWN_RESOURCE_PROFILE_ID} import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.storage._ import org.apache.spark.util.ManualClock @@ -47,6 +49,9 @@ class ExecutorMonitorSuite extends SparkFunSuite { private var client: ExecutorAllocationClient = _ private var clock: ManualClock = _ + private val execInfo = new ExecutorInfo("host1", 1, Map.empty, + Map.empty, Map.empty, DEFAULT_RESOURCE_PROFILE_ID) + // List of known executors. Allows easily mocking which executors are alive without // having to use mockito APIs directly in each test. private val knownExecs = mutable.HashSet[String]() @@ -64,10 +69,12 @@ class ExecutorMonitorSuite extends SparkFunSuite { test("basic executor timeout") { knownExecs += "1" - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) assert(monitor.executorCount === 1) assert(monitor.isExecutorIdle("1")) assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) + assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1) + assert(monitor.getResourceProfileId("1") === DEFAULT_RESOURCE_PROFILE_ID) } test("SPARK-4951, SPARK-26927: handle out of order task start events") { @@ -75,26 +82,38 @@ class ExecutorMonitorSuite extends SparkFunSuite { monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1))) assert(monitor.executorCount === 1) + assert(monitor.executorCountWithResourceProfile(UNKNOWN_RESOURCE_PROFILE_ID) === 1) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) assert(monitor.executorCount === 1) + assert(monitor.executorCountWithResourceProfile(UNKNOWN_RESOURCE_PROFILE_ID) === 0) + assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1) + assert(monitor.getResourceProfileId("1") === DEFAULT_RESOURCE_PROFILE_ID) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo)) assert(monitor.executorCount === 2) + assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 2) + assert(monitor.getResourceProfileId("2") === DEFAULT_RESOURCE_PROFILE_ID) monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), "2", null)) assert(monitor.executorCount === 1) + assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1) knownExecs -= "2" monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("2", 2))) assert(monitor.executorCount === 1) + assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 1) + + monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), "1", null)) + assert(monitor.executorCount === 0) + assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 0) } test("track tasks running on executor") { knownExecs += "1" - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1))) assert(!monitor.isExecutorIdle("1")) @@ -117,7 +136,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { test("use appropriate time out depending on whether blocks are stored") { knownExecs += "1" - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) assert(monitor.isExecutorIdle("1")) assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) @@ -139,7 +158,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { } test("keeps track of stored blocks for each rdd and split") { - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.onBlockUpdated(rddUpdate(1, 0, "1")) assert(monitor.timedOutExecutors(idleDeadline).isEmpty) @@ -173,19 +192,19 @@ class ExecutorMonitorSuite extends SparkFunSuite { knownExecs ++= Set("1", "2", "3") // start exec 1 at 0s (should idle time out at 60s) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) assert(monitor.isExecutorIdle("1")) // start exec 2 at 30s, store a block (should idle time out at 150s) clock.setTime(TimeUnit.SECONDS.toMillis(30)) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo)) monitor.onBlockUpdated(rddUpdate(1, 0, "2")) assert(monitor.isExecutorIdle("2")) assert(!monitor.timedOutExecutors(idleDeadline).contains("2")) // start exec 3 at 60s (should idle timeout at 120s, exec 1 should time out) clock.setTime(TimeUnit.SECONDS.toMillis(60)) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", execInfo)) assert(monitor.timedOutExecutors(clock.nanoTime()) === Seq("1")) // store block on exec 3 (should now idle time out at 180s) @@ -205,7 +224,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { test("SPARK-27677: don't track blocks stored on disk when using shuffle service") { // First make sure that blocks on disk are counted when no shuffle service is available. - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.DISK_ONLY)) assert(monitor.timedOutExecutors(idleDeadline).isEmpty) assert(monitor.timedOutExecutors(storageDeadline) === Seq("1")) @@ -213,7 +232,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { conf.set(SHUFFLE_SERVICE_ENABLED, true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) monitor = new ExecutorMonitor(conf, client, null, clock) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.MEMORY_ONLY)) monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = StorageLevel.MEMORY_ONLY)) assert(monitor.timedOutExecutors(idleDeadline).isEmpty) @@ -236,9 +255,9 @@ class ExecutorMonitorSuite extends SparkFunSuite { test("track executors pending for removal") { knownExecs ++= Set("1", "2", "3") - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null)) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", execInfo)) clock.setTime(idleDeadline) assert(monitor.timedOutExecutors().toSet === Set("1", "2", "3")) assert(monitor.pendingRemovalCount === 0) @@ -286,7 +305,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1, stage2))) monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(), Seq(stage3, stage4))) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) // First a failed task, to make sure it does not count. @@ -342,7 +361,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { throw new IllegalStateException("No event should be sent.") } } - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.shuffleCleaned(0) } @@ -351,8 +370,8 @@ class ExecutorMonitorSuite extends SparkFunSuite { conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false) monitor = new ExecutorMonitor(conf, client, bus, clock) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo)) // Two separate jobs with separate shuffles. The first job will only run tasks on // executor 1, the second on executor 2. Ensures that jobs finishing don't affect @@ -401,7 +420,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { val stage = stageInfo(1, shuffleId = 0) monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage))) clock.advance(1000L) - monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1))) monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1), new ExecutorMetrics, null)) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index b6ec869c42..f1bbe0b10e 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,14 @@ object MimaExcludes { // Exclude rules for 3.0.x lazy val v30excludes = v24excludes ++ Seq( + // [SPARK-29306] Add support for Stage level scheduling for executors + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productElement"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productArity"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.canEqual"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productIterator"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.productPrefix"), + ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#RetrieveSparkAppConfig.toString"), + // [SPARK-29399][core] Remove old ExecutorPlugin interface. ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ExecutorPlugin"), diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 8f6ae5904f..1876861700 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkCon import org.apache.spark.deploy.mesos.{config => mesosConfig} import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient +import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor} @@ -716,7 +717,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val mockEndpointRef = mock[RpcEndpointRef] val mockAddress = mock[RpcAddress] val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty, - Map.empty, Map.empty) + Map.empty, Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) backend.driverEndpoint.askSync[Boolean](message) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 2e9576e335..1e8f4084ef 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -48,6 +48,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Streaming.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS import org.apache.spark.internal.config.UI._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} +import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -455,7 +456,8 @@ private[spark] class ApplicationMaster( val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = _sparkConf.get(EXECUTOR_CORES) val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "", - "", executorMemory, executorCores, appId, securityMgr, localResources) + "", executorMemory, executorCores, appId, securityMgr, localResources, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) dummyRunner.launchContextDebugInfo() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 7046ad7405..d9262bbac6 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -40,7 +40,8 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.JavaUtils -import org.apache.spark.util.{Utils, YarnContainerInfoHelper} +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.util.Utils private[yarn] class ExecutorRunnable( container: Option[Container], @@ -53,7 +54,8 @@ private[yarn] class ExecutorRunnable( executorCores: Int, appId: String, securityMgr: SecurityManager, - localResources: Map[String, LocalResource]) extends Logging { + localResources: Map[String, LocalResource], + resourceProfileId: Int) extends Logging { var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ @@ -72,7 +74,7 @@ private[yarn] class ExecutorRunnable( s""" |=============================================================================== - |YARN executor launch context: + |Default YARN executor launch context: | env: |${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString} | command: @@ -207,7 +209,8 @@ private[yarn] class ExecutorRunnable( "--executor-id", executorId, "--hostname", hostname, "--cores", executorCores.toString, - "--app-id", appId) ++ + "--app-id", appId, + "--resourceProfileId", resourceProfileId.toString) ++ userClassPath ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index f68be33e05..09414cbbe5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -38,6 +38,7 @@ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ +import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor @@ -565,7 +566,8 @@ private[yarn] class YarnAllocator( executorCores, appAttemptId.getApplicationId.toString, securityMgr, - localResources + localResources, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported ).run() updateInternalState() } catch { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index d46424e5e9..669e39fb7c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -22,6 +22,7 @@ import java.net.URL import org.apache.spark.SparkEnv import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.YarnContainerInfoHelper @@ -39,7 +40,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend( cores: Int, userClassPath: Seq[URL], env: SparkEnv, - resourcesFile: Option[String]) + resourcesFile: Option[String], + resourceProfile: ResourceProfile) extends CoarseGrainedExecutorBackend( rpcEnv, driverUrl, @@ -49,7 +51,8 @@ private[spark] class YarnCoarseGrainedExecutorBackend( cores, userClassPath, env, - resourcesFile) with Logging { + resourcesFile, + resourceProfile) with Logging { private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf) @@ -67,11 +70,11 @@ private[spark] class YarnCoarseGrainedExecutorBackend( private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { def main(args: Array[String]): Unit = { - val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) => - CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => + val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) => + CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env, - arguments.resourcesFileOpt) + arguments.resourcesFileOpt, resourceProfile) } val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$"))