[SPARK-33288][SPARK-32661][K8S] Stage level scheduling support for Kubernetes

### What changes were proposed in this pull request?

This adds support for Stage level scheduling to kubernetes. Kubernetes can support dynamic allocation via the shuffle tracking option which means we can support stage level scheduling by getting new executors.
The main changes here are having the k8s cluster manager pass the resource profile id into the executors and then the ExecutorsPodsAllocator has to request executors based on the individual resource profiles.  I tried to keep code changes here to a minimum. I specifically choose to leave the ExecutorPodsSnapshot the way it was and construct the resource profile to pod states on the fly, with a fast path when not using other resource profiles, to keep the impact to a minimum.  This results in the main changes required are just wrapping the allocation logic in a for loop over each profile.  The other main change is in the basic feature step we have to look at the resources in the ResourceProfile to request pods with the correct resources.  Much of the other logic like in the executor life cycle manager doesn't need to be resource profile.

This also adds support for [SPARK-32661]Spark executors on K8S should request extra memory for off-heap allocations because the stage level scheduling api has support for this and it made sense to make consistent with YARN.  This was started with PR https://github.com/apache/spark/pull/29477 but never updated so I just did it here.   To do this I moved a few functions around that were now used by both YARN and kubernetes so you will see some changes in Utils.

### Why are the changes needed?

Add the feature to Kubernetes based on customer feedback.

### Does this PR introduce _any_ user-facing change?

Yes the feature now works with K8s, but not underlying API changes.

### How was this patch tested?

Tested manually on kubernetes cluster and with unit tests.

Closes #30204 from tgravescs/stagek8sOrigSnapshotsRebase.

Lead-authored-by: Thomas Graves <tgraves@apache.org>
Co-authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
Thomas Graves 2020-11-13 16:04:13 -06:00
parent 234711a328
commit acfd846753
30 changed files with 772 additions and 383 deletions

View file

@ -29,6 +29,7 @@ import org.apache.spark.annotation.{Evolving, Since}
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
import org.apache.spark.util.Utils
/** /**
* Resource profile to associate with an RDD. A ResourceProfile allows the user to * Resource profile to associate with an RDD. A ResourceProfile allows the user to
@ -256,6 +257,8 @@ object ResourceProfile extends Logging {
val UNKNOWN_RESOURCE_PROFILE_ID = -1 val UNKNOWN_RESOURCE_PROFILE_ID = -1
val DEFAULT_RESOURCE_PROFILE_ID = 0 val DEFAULT_RESOURCE_PROFILE_ID = 0
private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L
private lazy val nextProfileId = new AtomicInteger(0) private lazy val nextProfileId = new AtomicInteger(0)
private val DEFAULT_PROFILE_LOCK = new Object() private val DEFAULT_PROFILE_LOCK = new Object()
@ -263,6 +266,7 @@ object ResourceProfile extends Logging {
// var so that it can be reset for testing purposes. // var so that it can be reset for testing purposes.
@GuardedBy("DEFAULT_PROFILE_LOCK") @GuardedBy("DEFAULT_PROFILE_LOCK")
private var defaultProfile: Option[ResourceProfile] = None private var defaultProfile: Option[ResourceProfile] = None
private var defaultProfileExecutorResources: Option[DefaultProfileExecutorResources] = None
private[spark] def getNextProfileId: Int = nextProfileId.getAndIncrement() private[spark] def getNextProfileId: Int = nextProfileId.getAndIncrement()
@ -284,6 +288,14 @@ object ResourceProfile extends Logging {
} }
} }
private[spark] def getDefaultProfileExecutorResources(
conf: SparkConf): DefaultProfileExecutorResources = {
defaultProfileExecutorResources.getOrElse {
getOrCreateDefaultProfile(conf)
defaultProfileExecutorResources.get
}
}
private def getDefaultTaskResources(conf: SparkConf): Map[String, TaskResourceRequest] = { private def getDefaultTaskResources(conf: SparkConf): Map[String, TaskResourceRequest] = {
val cpusPerTask = conf.get(CPUS_PER_TASK) val cpusPerTask = conf.get(CPUS_PER_TASK)
val treqs = new TaskResourceRequests().cpus(cpusPerTask) val treqs = new TaskResourceRequests().cpus(cpusPerTask)
@ -293,20 +305,26 @@ object ResourceProfile extends Logging {
private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = { private def getDefaultExecutorResources(conf: SparkConf): Map[String, ExecutorResourceRequest] = {
val ereqs = new ExecutorResourceRequests() val ereqs = new ExecutorResourceRequests()
ereqs.cores(conf.get(EXECUTOR_CORES)) val cores = conf.get(EXECUTOR_CORES)
ereqs.memory(conf.get(EXECUTOR_MEMORY).toString) ereqs.cores(cores)
conf.get(EXECUTOR_MEMORY_OVERHEAD).map(mem => ereqs.memoryOverhead(mem.toString)) val memory = conf.get(EXECUTOR_MEMORY)
conf.get(PYSPARK_EXECUTOR_MEMORY).map(mem => ereqs.pysparkMemory(mem.toString)) ereqs.memory(memory.toString)
if (conf.get(MEMORY_OFFHEAP_ENABLED)) { val overheadMem = conf.get(EXECUTOR_MEMORY_OVERHEAD)
// Explicitly add suffix b as default unit of offHeapMemory is Mib overheadMem.map(mem => ereqs.memoryOverhead(mem.toString))
ereqs.offHeapMemory(conf.get(MEMORY_OFFHEAP_SIZE).toString + "b") val pysparkMem = conf.get(PYSPARK_EXECUTOR_MEMORY)
} pysparkMem.map(mem => ereqs.pysparkMemory(mem.toString))
val offheapMem = Utils.executorOffHeapMemorySizeAsMb(conf)
ereqs.offHeapMemory(offheapMem.toString)
val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX) val execReq = ResourceUtils.parseAllResourceRequests(conf, SPARK_EXECUTOR_PREFIX)
execReq.foreach { req => execReq.foreach { req =>
val name = req.id.resourceName ereqs.resource(req.id.resourceName, req.amount, req.discoveryScript.orElse(""),
ereqs.resource(name, req.amount, req.discoveryScript.orElse(""),
req.vendor.orElse("")) req.vendor.orElse(""))
} }
val customResourceNames = execReq.map(_.id.resourceName).toSet
val customResources = ereqs.requests.filter(v => customResourceNames.contains(v._1))
defaultProfileExecutorResources =
Some(DefaultProfileExecutorResources(cores, memory, offheapMem, pysparkMem,
overheadMem, customResources))
ereqs.requests ereqs.requests
} }
@ -320,6 +338,7 @@ object ResourceProfile extends Logging {
private[spark] def clearDefaultProfile(): Unit = { private[spark] def clearDefaultProfile(): Unit = {
DEFAULT_PROFILE_LOCK.synchronized { DEFAULT_PROFILE_LOCK.synchronized {
defaultProfile = None defaultProfile = None
defaultProfileExecutorResources = None
} }
} }
@ -342,6 +361,100 @@ object ResourceProfile extends Logging {
rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK)) rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
} }
/**
* Get offHeap memory size from [[ExecutorResourceRequest]]
* return 0 if MEMORY_OFFHEAP_ENABLED is false.
*/
private[spark] def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
execRequest: ExecutorResourceRequest): Long = {
Utils.checkOffHeapEnabled(sparkConf, execRequest.amount)
}
private[spark] case class ExecutorResourcesOrDefaults(
cores: Int,
executorMemoryMiB: Long,
memoryOffHeapMiB: Long,
pysparkMemoryMiB: Long,
memoryOverheadMiB: Long,
totalMemMiB: Long,
customResources: Map[String, ExecutorResourceRequest])
private[spark] case class DefaultProfileExecutorResources(
cores: Int,
executorMemoryMiB: Long,
memoryOffHeapMiB: Long,
pysparkMemoryMiB: Option[Long],
memoryOverheadMiB: Option[Long],
customResources: Map[String, ExecutorResourceRequest])
private[spark] def calculateOverHeadMemory(
overHeadMemFromConf: Option[Long],
executorMemoryMiB: Long,
overheadFactor: Double): Long = {
overHeadMemFromConf.getOrElse(math.max((overheadFactor * executorMemoryMiB).toInt,
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB))
}
/**
* Gets the full list of resources to allow a cluster manager to request the appropriate
* container. If the resource profile is not the default one we either get the resources
* specified in the profile or fall back to the default profile resource size for everything
* except for custom resources.
*/
private[spark] def getResourcesForClusterManager(
rpId: Int,
execResources: Map[String, ExecutorResourceRequest],
overheadFactor: Double,
conf: SparkConf,
isPythonApp: Boolean,
resourceMappings: Map[String, String]): ExecutorResourcesOrDefaults = {
val defaultResources = getDefaultProfileExecutorResources(conf)
// set all the default values, which may change for custom ResourceProfiles
var cores = defaultResources.cores
var executorMemoryMiB = defaultResources.executorMemoryMiB
var memoryOffHeapMiB = defaultResources.memoryOffHeapMiB
var pysparkMemoryMiB = defaultResources.pysparkMemoryMiB.getOrElse(0L)
var memoryOverheadMiB = calculateOverHeadMemory(defaultResources.memoryOverheadMiB,
executorMemoryMiB, overheadFactor)
val finalCustomResources = if (rpId != DEFAULT_RESOURCE_PROFILE_ID) {
val customResources = new mutable.HashMap[String, ExecutorResourceRequest]
execResources.foreach { case (r, execReq) =>
r match {
case ResourceProfile.MEMORY =>
executorMemoryMiB = execReq.amount
case ResourceProfile.OVERHEAD_MEM =>
memoryOverheadMiB = execReq.amount
case ResourceProfile.PYSPARK_MEM =>
pysparkMemoryMiB = execReq.amount
case ResourceProfile.OFFHEAP_MEM =>
memoryOffHeapMiB = executorOffHeapMemorySizeAsMb(conf, execReq)
case ResourceProfile.CORES =>
cores = execReq.amount.toInt
case rName =>
val nameToUse = resourceMappings.get(rName).getOrElse(rName)
customResources(nameToUse) = execReq
}
}
customResources.toMap
} else {
defaultResources.customResources.map { case (rName, execReq) =>
val nameToUse = resourceMappings.get(rName).getOrElse(rName)
(nameToUse, execReq)
}
}
// only add in pyspark memory if actually a python application
val pysparkMemToUseMiB = if (isPythonApp) {
pysparkMemoryMiB
} else {
0L
}
val totalMemMiB =
(executorMemoryMiB + memoryOverheadMiB + memoryOffHeapMiB + pysparkMemToUseMiB)
ExecutorResourcesOrDefaults(cores, executorMemoryMiB, memoryOffHeapMiB,
pysparkMemToUseMiB, memoryOverheadMiB, totalMemMiB, finalCustomResources)
}
private[spark] val PYSPARK_MEMORY_LOCAL_PROPERTY = "resource.pyspark.memory" private[spark] val PYSPARK_MEMORY_LOCAL_PROPERTY = "resource.pyspark.memory"
private[spark] val EXECUTOR_CORES_LOCAL_PROPERTY = "resource.executor.cores" private[spark] val EXECUTOR_CORES_LOCAL_PROPERTY = "resource.executor.cores"
} }

View file

@ -52,18 +52,25 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
private val dynamicEnabled = Utils.isDynamicAllocationEnabled(sparkConf) private val dynamicEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
private val master = sparkConf.getOption("spark.master") private val master = sparkConf.getOption("spark.master")
private val isNotYarn = master.isDefined && !master.get.equals("yarn") private val isYarn = master.isDefined && master.get.equals("yarn")
private val errorForTesting = !isTesting || sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING) private val isK8s = master.isDefined && master.get.startsWith("k8s://")
private val notRunningUnitTests = !isTesting
private val testExceptionThrown = sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
// If we use anything except the default profile, its only supported on YARN right now. // If we use anything except the default profile, its only supported on YARN right now.
// Throw an exception if not supported. // Throw an exception if not supported.
private[spark] def isSupported(rp: ResourceProfile): Boolean = { private[spark] def isSupported(rp: ResourceProfile): Boolean = {
val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
val notYarnAndNotDefaultProfile = isNotDefaultProfile && isNotYarn val notYarnOrK8sAndNotDefaultProfile = isNotDefaultProfile && !(isYarn || isK8s)
val YarnNotDynAllocAndNotDefaultProfile = isNotDefaultProfile && !isNotYarn && !dynamicEnabled val YarnOrK8sNotDynAllocAndNotDefaultProfile =
if (errorForTesting && (notYarnAndNotDefaultProfile || YarnNotDynAllocAndNotDefaultProfile)) { isNotDefaultProfile && (isYarn || isK8s) && !dynamicEnabled
throw new SparkException("ResourceProfiles are only supported on YARN with dynamic " + // We want the exception to be thrown only when we are specifically testing for the
"allocation enabled.") // exception or in a real application. Otherwise in all other testing scenarios we want
// to skip throwing the exception so that we can test in other modes to make testing easier.
if ((notRunningUnitTests || testExceptionThrown) &&
(notYarnOrK8sAndNotDefaultProfile || YarnOrK8sNotDynAllocAndNotDefaultProfile)) {
throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " +
"with dynamic allocation enabled.")
} }
true true
} }

View file

@ -2971,6 +2971,27 @@ private[spark] object Utils extends Logging {
metadata.append("]") metadata.append("]")
metadata.toString metadata.toString
} }
/**
* Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false.
*/
def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
checkOffHeapEnabled(sparkConf, sizeInMB).toInt
}
/**
* return 0 if MEMORY_OFFHEAP_ENABLED is false.
*/
def checkOffHeapEnabled(sparkConf: SparkConf, offHeapSize: Long): Long = {
if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
require(offHeapSize > 0,
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
offHeapSize
} else {
0
}
}
} }
private[util] object CallerContext extends Logging { private[util] object CallerContext extends Logging {

View file

@ -47,8 +47,8 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
val rpmanager = new ResourceProfileManager(conf, listenerBus) val rpmanager = new ResourceProfileManager(conf, listenerBus)
val defaultProf = rpmanager.defaultResourceProfile val defaultProf = rpmanager.defaultResourceProfile
assert(defaultProf.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) assert(defaultProf.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
assert(defaultProf.executorResources.size === 2, assert(defaultProf.executorResources.size === 3,
"Executor resources should contain cores and memory by default") "Executor resources should contain cores, heap and offheap memory by default")
assert(defaultProf.executorResources(ResourceProfile.CORES).amount === 4, assert(defaultProf.executorResources(ResourceProfile.CORES).amount === 4,
s"Executor resources should have 4 cores") s"Executor resources should have 4 cores")
} }
@ -67,7 +67,8 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
rpmanager.isSupported(immrprof) rpmanager.isSupported(immrprof)
}.getMessage() }.getMessage()
assert(error.contains("ResourceProfiles are only supported on YARN with dynamic allocation")) assert(error.contains(
"ResourceProfiles are only supported on YARN and Kubernetes with dynamic allocation"))
} }
test("isSupported yarn with dynamic allocation") { test("isSupported yarn with dynamic allocation") {
@ -84,7 +85,22 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
assert(rpmanager.isSupported(immrprof) == true) assert(rpmanager.isSupported(immrprof) == true)
} }
test("isSupported yarn with local mode") { test("isSupported k8s with dynamic allocation") {
val conf = new SparkConf().setMaster("k8s://foo").set(EXECUTOR_CORES, 4)
conf.set(DYN_ALLOCATION_ENABLED, true)
conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
val rpmanager = new ResourceProfileManager(conf, listenerBus)
// default profile should always work
val defaultProf = rpmanager.defaultResourceProfile
val rprof = new ResourceProfileBuilder()
val gpuExecReq =
new ExecutorResourceRequests().resource("gpu", 2, "someScript", "nvidia")
val immrprof = rprof.require(gpuExecReq).build
assert(rpmanager.isSupported(immrprof) == true)
}
test("isSupported with local mode") {
val conf = new SparkConf().setMaster("local").set(EXECUTOR_CORES, 4) val conf = new SparkConf().setMaster("local").set(EXECUTOR_CORES, 4)
conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true") conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
val rpmanager = new ResourceProfileManager(conf, listenerBus) val rpmanager = new ResourceProfileManager(conf, listenerBus)
@ -98,7 +114,8 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
rpmanager.isSupported(immrprof) rpmanager.isSupported(immrprof)
}.getMessage() }.getMessage()
assert(error.contains("ResourceProfiles are only supported on YARN with dynamic allocation")) assert(error.contains(
"ResourceProfiles are only supported on YARN and Kubernetes with dynamic allocation"))
} }
test("ResourceProfileManager has equivalent profile") { test("ResourceProfileManager has equivalent profile") {

View file

@ -43,8 +43,8 @@ class ResourceProfileSuite extends SparkFunSuite {
test("Default ResourceProfile") { test("Default ResourceProfile") {
val rprof = ResourceProfile.getOrCreateDefaultProfile(new SparkConf) val rprof = ResourceProfile.getOrCreateDefaultProfile(new SparkConf)
assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) assert(rprof.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
assert(rprof.executorResources.size === 2, assert(rprof.executorResources.size === 3,
"Executor resources should contain cores and memory by default") "Executor resources should contain cores, heap and offheap memory by default")
assert(rprof.executorResources(ResourceProfile.CORES).amount === 1, assert(rprof.executorResources(ResourceProfile.CORES).amount === 1,
"Executor resources should have 1 core") "Executor resources should have 1 core")
assert(rprof.getExecutorCores.get === 1, assert(rprof.getExecutorCores.get === 1,
@ -55,8 +55,8 @@ class ResourceProfileSuite extends SparkFunSuite {
"pyspark memory empty if not specified") "pyspark memory empty if not specified")
assert(rprof.executorResources.get(ResourceProfile.OVERHEAD_MEM) == None, assert(rprof.executorResources.get(ResourceProfile.OVERHEAD_MEM) == None,
"overhead memory empty if not specified") "overhead memory empty if not specified")
assert(rprof.executorResources.get(ResourceProfile.OFFHEAP_MEM) == None, assert(rprof.executorResources(ResourceProfile.OFFHEAP_MEM).amount === 0,
"offHeap memory empty if not specified") "Executor resources should have 0 offheap memory")
assert(rprof.taskResources.size === 1, assert(rprof.taskResources.size === 1,
"Task resources should just contain cpus by default") "Task resources should just contain cpus by default")
assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1, assert(rprof.taskResources(ResourceProfile.CPUS).amount === 1,

View file

@ -1406,6 +1406,33 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(hostnamePort._1.equals("localhost")) assert(hostnamePort._1.equals("localhost"))
assert(hostnamePort._2 === 0) assert(hostnamePort._2 === 0)
} }
test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is false") {
val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(new SparkConf())
assert(executorOffHeapMemory == 0)
}
test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is true") {
val offHeapMemoryInMB = 50
val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024
val sparkConf = new SparkConf()
.set(MEMORY_OFFHEAP_ENABLED, true)
.set(MEMORY_OFFHEAP_SIZE, offHeapMemory)
val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(sparkConf)
assert(executorOffHeapMemory == offHeapMemoryInMB)
}
test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " +
"but MEMORY_OFFHEAP_SIZE not config scene") {
val sparkConf = new SparkConf()
.set(MEMORY_OFFHEAP_ENABLED, true)
val expected =
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true"
val message = intercept[IllegalArgumentException] {
Utils.executorOffHeapMemorySizeAsMb(sparkConf)
}.getMessage
assert(message.contains(expected))
}
} }
private class SimpleExtension private class SimpleExtension

View file

@ -3051,6 +3051,6 @@ See your cluster manager specific page for requirements and details on each of -
# Stage Level Scheduling Overview # Stage Level Scheduling Overview
The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at the start of the application and them be idle while the ETL stage is being run. The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at the start of the application and them be idle while the ETL stage is being run.
This is only available for the RDD API in Scala, Java, and Python and requires dynamic allocation to be enabled. It is only available on YARN at this time. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page for more implementation details. This is only available for the RDD API in Scala, Java, and Python. It is available on YARN and Kubernetes when dynamic allocation is enabled. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this feature. The current implementation acquires new executors for each `ResourceProfile` created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one ResourceProfile per stage. If the user associates more then 1 ResourceProfile to an RDD, Spark will throw an exception by default. See config `spark.scheduler.resource.profileMergeConflicts` to control that behavior. The current merge strategy Spark implements when `spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of each resource within the conflicting ResourceProfiles. Spark will create a new ResourceProfile with the max of each of the resources. See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this feature. The current implementation acquires new executors for each `ResourceProfile` created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one ResourceProfile per stage. If the user associates more then 1 ResourceProfile to an RDD, Spark will throw an exception by default. See config `spark.scheduler.resource.profileMergeConflicts` to control that behavior. The current merge strategy Spark implements when `spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of each resource within the conflicting ResourceProfiles. Spark will create a new ResourceProfile with the max of each of the resources.

View file

@ -1399,3 +1399,7 @@ Spark automatically handles translating the Spark configs <code>spark.{driver/ex
Kubernetes does not tell Spark the addresses of the resources allocated to each container. For that reason, the user must specify a discovery script that gets run by the executor on startup to discover what resources are available to that executor. You can find an example scripts in `examples/src/main/scripts/getGpusResources.sh`. The script must have execute permissions set and the user should setup permissions to not allow malicious users to modify it. The script should write to STDOUT a JSON string in the format of the ResourceInformation class. This has the resource name and an array of resource addresses available to just that executor. Kubernetes does not tell Spark the addresses of the resources allocated to each container. For that reason, the user must specify a discovery script that gets run by the executor on startup to discover what resources are available to that executor. You can find an example scripts in `examples/src/main/scripts/getGpusResources.sh`. The script must have execute permissions set and the user should setup permissions to not allow malicious users to modify it. The script should write to STDOUT a JSON string in the format of the ResourceInformation class. This has the resource name and an array of resource addresses available to just that executor.
### Stage Level Scheduling Overview
Stage level scheduling is supported on Kubernetes when dynamic allocation is enabled. This also requires <code>spark.dynamicAllocation.shuffleTracking.enabled</code> to be enabled since Kubernetes doesn't support an external shuffle service at this time. The order in which containers for different profiles is requested from Kubernetes is not guaranteed. Note that since dynamic allocation on Kubernetes requires the shuffle tracking feature, this means that executors from previous stages that used a different ResourceProfile may not idle timeout due to having shuffle data on them. This could result in using more cluster resources and in the worst case if there are no remaining resources on the Kubernetes cluster then Spark could potentially hang. You may consider looking at config <code>spark.dynamicAllocation.shuffleTracking.timeout</code> to set a timeout, but that could result in data having to be recomputed if the shuffle data is really needed.
Note, there is a difference in the way pod template resources are handled between the base default profile and custom ResourceProfiles. Any resources specified in the pod template file will only be used with the base default profile. If you create custom ResourceProfiles be sure to include all necessary resources there since the resources from the template file will not be propogated to custom ResourceProfiles.

View file

@ -644,6 +644,7 @@ YARN does not tell Spark the addresses of the resources allocated to each contai
# Stage Level Scheduling Overview # Stage Level Scheduling Overview
Stage level scheduling is supported on YARN when dynamic allocation is enabled. One thing to note that is YARN specific is that each ResourceProfile requires a different container priority on YARN. The mapping is simply the ResourceProfile id becomes the priority, on YARN lower numbers are higher priority. This means that profiles created earlier will have a higher priority in YARN. Normally this won't matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to keep in mind. Stage level scheduling is supported on YARN when dynamic allocation is enabled. One thing to note that is YARN specific is that each ResourceProfile requires a different container priority on YARN. The mapping is simply the ResourceProfile id becomes the priority, on YARN lower numbers are higher priority. This means that profiles created earlier will have a higher priority in YARN. Normally this won't matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to keep in mind.
Note there is a difference in the way custom resources are handled between the base default profile and custom ResourceProfiles. To allow for the user to request YARN containers with extra resources without Spark scheduling on them, the user can specify resources via the <code>spark.yarn.executor.resource.</code> config. Those configs are only used in the base default profile though and do not get propogated into any other custom ResourceProfiles. This is because there would be no way to remove them if you wanted a stage to not have them. This results in your default profile getting custom resources defined in <code>spark.yarn.executor.resource.</code> plus spark defined resources of GPU or FPGA. Spark converts GPU and FPGA resources into the YARN built in types <code>yarn.io/gpu</code>) and <code>yarn.io/fpga</code>, but does not know the mapping of any other resources. Any other Spark custom resources are not propogated to YARN for the default profile. So if you want Spark to schedule based off a custom resource and have it requested from YARN, you must specify it in both YARN (<code>spark.yarn.{driver/executor}.resource.</code>) and Spark (<code>spark.{driver/executor}.resource.</code>) configs. Leave the Spark config off if you only want YARN containers with the extra resources but Spark not to schedule using them. Now for custom ResourceProfiles, it doesn't currently have a way to only specify YARN resources without Spark scheduling off of them. This means for custom ResourceProfiles we propogate all the resources defined in the ResourceProfile to YARN. We still convert GPU and FPGA to the YARN build in types as well. This requires that the name of any custom resources you specify match what they are defined as in YARN.
# Important notes # Important notes

View file

@ -21,6 +21,7 @@ private[spark] object Constants {
// Labels // Labels
val SPARK_APP_ID_LABEL = "spark-app-selector" val SPARK_APP_ID_LABEL = "spark-app-selector"
val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id" val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
val SPARK_RESOURCE_PROFILE_ID_LABEL = "spark-exec-resourceprofile-id"
val SPARK_ROLE_LABEL = "spark-role" val SPARK_ROLE_LABEL = "spark-role"
val SPARK_POD_DRIVER_ROLE = "driver" val SPARK_POD_DRIVER_ROLE = "driver"
val SPARK_POD_EXECUTOR_ROLE = "executor" val SPARK_POD_EXECUTOR_ROLE = "executor"
@ -63,6 +64,7 @@ private[spark] object Constants {
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS" val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR" val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR"
val ENV_SPARK_USER = "SPARK_USER" val ENV_SPARK_USER = "SPARK_USER"
val ENV_RESOURCE_PROFILE_ID = "SPARK_RESOURCE_PROFILE_ID"
// Spark app configs for containers // Spark app configs for containers
val SPARK_CONF_VOLUME = "spark-conf-volume" val SPARK_CONF_VOLUME = "spark-conf-volume"
val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf" val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
@ -84,7 +86,6 @@ private[spark] object Constants {
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor" val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
val MEMORY_OVERHEAD_MIN_MIB = 384L
val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d
// Hadoop Configuration // Hadoop Configuration

View file

@ -26,6 +26,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
/** /**
@ -132,7 +133,8 @@ private[spark] class KubernetesExecutorConf(
sparkConf: SparkConf, sparkConf: SparkConf,
val appId: String, val appId: String,
val executorId: String, val executorId: String,
val driverPod: Option[Pod]) val driverPod: Option[Pod],
val resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID)
extends KubernetesConf(sparkConf) with Logging { extends KubernetesConf(sparkConf) with Logging {
override val resourceNamePrefix: String = { override val resourceNamePrefix: String = {
@ -144,7 +146,8 @@ private[spark] class KubernetesExecutorConf(
val presetLabels = Map( val presetLabels = Map(
SPARK_EXECUTOR_ID_LABEL -> executorId, SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> appId, SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE,
SPARK_RESOURCE_PROFILE_ID_LABEL -> resourceProfileId.toString)
val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
@ -217,8 +220,9 @@ private[spark] object KubernetesConf {
sparkConf: SparkConf, sparkConf: SparkConf,
executorId: String, executorId: String,
appId: String, appId: String,
driverPod: Option[Pod]): KubernetesExecutorConf = { driverPod: Option[Pod],
new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, driverPod) resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID): KubernetesExecutorConf = {
new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, driverPod, resourceProfileId)
} }
def getResourceNamePrefix(appName: String): String = { def getResourceNamePrefix(appName: String): String = {

View file

@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.ui.SparkUI import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
@ -66,7 +67,8 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
private val memoryOverheadMiB = conf private val memoryOverheadMiB = conf
.get(DRIVER_MEMORY_OVERHEAD) .get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) .getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt,
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB))
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
override def configurePod(pod: SparkPod): SparkPod = { override def configurePod(pod: SparkPod): SparkPod = {

View file

@ -26,14 +26,15 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._ import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile}
import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
private[spark] class BasicExecutorFeatureStep( private[spark] class BasicExecutorFeatureStep(
kubernetesConf: KubernetesExecutorConf, kubernetesConf: KubernetesExecutorConf,
secMgr: SecurityManager) secMgr: SecurityManager,
resourceProfile: ResourceProfile)
extends KubernetesFeatureConfigStep with Logging { extends KubernetesFeatureConfigStep with Logging {
// Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
@ -50,33 +51,43 @@ private[spark] class BasicExecutorFeatureStep(
kubernetesConf.get(DRIVER_HOST_ADDRESS), kubernetesConf.get(DRIVER_HOST_ADDRESS),
kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT), kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
private val executorMemoryString = kubernetesConf.get(
EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString)
private val memoryOverheadMiB = kubernetesConf private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
.get(EXECUTOR_MEMORY_OVERHEAD) private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)
.getOrElse(math.max(
(kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
private val executorMemoryTotal =
if (kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)) {
executorMemoryWithOverhead +
kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
} else {
executorMemoryWithOverhead
}
private val executorCores = kubernetesConf.sparkConf.get(EXECUTOR_CORES) val execResources = ResourceProfile.getResourcesForClusterManager(
resourceProfile.id,
resourceProfile.executorResources,
kubernetesConf.get(MEMORY_OVERHEAD_FACTOR),
kubernetesConf.sparkConf,
isPythonApp,
Map.empty)
private val executorMemoryString = s"${execResources.executorMemoryMiB}m"
// we don't include any kubernetes conf specific requests or limits when using custom
// ResourceProfiles because we don't have a way of overriding them if needed
private val executorCoresRequest = private val executorCoresRequest =
if (kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) { if (isDefaultProfile && kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
} else { } else {
executorCores.toString execResources.cores.toString
} }
private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
private def buildExecutorResourcesQuantities(
customResources: Set[ExecutorResourceRequest]): Map[String, Quantity] = {
customResources.map { request =>
val vendorDomain = if (request.vendor.nonEmpty) {
request.vendor
} else {
throw new SparkException(s"Resource: ${request.resourceName} was requested, " +
"but vendor was not specified.")
}
val quantity = new Quantity(request.amount.toString)
(KubernetesConf.buildKubernetesResourceName(vendorDomain, request.resourceName), quantity)
}.toMap
}
override def configurePod(pod: SparkPod): SparkPod = { override def configurePod(pod: SparkPod): SparkPod = {
val name = s"$executorPodNamePrefix-exec-${kubernetesConf.executorId}" val name = s"$executorPodNamePrefix-exec-${kubernetesConf.executorId}"
@ -89,22 +100,21 @@ private[spark] class BasicExecutorFeatureStep(
// Replace dangerous characters in the remaining string with a safe alternative. // Replace dangerous characters in the remaining string with a safe alternative.
.replaceAll("[^\\w-]+", "_") .replaceAll("[^\\w-]+", "_")
val executorMemoryQuantity = new Quantity(s"${executorMemoryTotal}Mi") val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi")
val executorCpuQuantity = new Quantity(executorCoresRequest) val executorCpuQuantity = new Quantity(executorCoresRequest)
val executorResourceQuantities = val executorResourceQuantities =
KubernetesUtils.buildResourcesQuantities(SPARK_EXECUTOR_PREFIX, buildExecutorResourcesQuantities(execResources.customResources.values.toSet)
kubernetesConf.sparkConf)
val executorEnv: Seq[EnvVar] = { val executorEnv: Seq[EnvVar] = {
(Seq( (Seq(
(ENV_DRIVER_URL, driverUrl), (ENV_DRIVER_URL, driverUrl),
(ENV_EXECUTOR_CORES, executorCores.toString), (ENV_EXECUTOR_CORES, execResources.cores.toString),
(ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, kubernetesConf.appId), (ENV_APPLICATION_ID, kubernetesConf.appId),
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
(ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL), (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
(ENV_EXECUTOR_ID, kubernetesConf.executorId) (ENV_EXECUTOR_ID, kubernetesConf.executorId),
(ENV_RESOURCE_PROFILE_ID, resourceProfile.id.toString)
) ++ kubernetesConf.environment).map { case (k, v) => ) ++ kubernetesConf.environment).map { case (k, v) =>
new EnvVarBuilder() new EnvVarBuilder()
.withName(k) .withName(k)
@ -166,6 +176,13 @@ private[spark] class BasicExecutorFeatureStep(
.build() .build()
} }
if (!isDefaultProfile) {
if (pod.container != null && pod.container.getResources() != null) {
logDebug("NOT using the default profile and removing template resources")
pod.container.setResources(new ResourceRequirements())
}
}
val executorContainer = new ContainerBuilder(pod.container) val executorContainer = new ContainerBuilder(pod.container)
.withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)) .withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME))
.withImage(executorContainerImage) .withImage(executorContainerImage)
@ -184,14 +201,18 @@ private[spark] class BasicExecutorFeatureStep(
.withPorts(requiredPorts.asJava) .withPorts(requiredPorts.asJava)
.addToArgs("executor") .addToArgs("executor")
.build() .build()
val containerWithLimitCores = executorLimitCores.map { limitCores => val containerWithLimitCores = if (isDefaultProfile) {
val executorCpuLimitQuantity = new Quantity(limitCores) executorLimitCores.map { limitCores =>
new ContainerBuilder(executorContainer) val executorCpuLimitQuantity = new Quantity(limitCores)
.editResources() new ContainerBuilder(executorContainer)
.editResources()
.addToLimits("cpu", executorCpuLimitQuantity) .addToLimits("cpu", executorCpuLimitQuantity)
.endResources() .endResources()
.build() .build()
}.getOrElse(executorContainer) }.getOrElse(executorContainer)
} else {
executorContainer
}
val containerWithLifecycle = val containerWithLifecycle =
if (!kubernetesConf.workerDecommissioning) { if (!kubernetesConf.workerDecommissioning) {
logInfo("Decommissioning not enabled, skipping shutdown script") logInfo("Decommissioning not enabled, skipping shutdown script")

View file

@ -17,13 +17,14 @@
package org.apache.spark.scheduler.cluster.k8s package org.apache.spark.scheduler.cluster.k8s
import java.time.Instant import java.time.Instant
import java.time.format.DateTimeParseException import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
import scala.util.control.NonFatal import scala.util.control.NonFatal
import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, PodBuilder} import io.fabric8.kubernetes.api.model.{PersistentVolumeClaim, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.{SecurityManager, SparkConf, SparkException}
@ -33,6 +34,8 @@ import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils
import org.apache.spark.util.{Clock, Utils} import org.apache.spark.util.{Clock, Utils}
private[spark] class ExecutorPodsAllocator( private[spark] class ExecutorPodsAllocator(
@ -45,7 +48,11 @@ private[spark] class ExecutorPodsAllocator(
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
private val totalExpectedExecutors = new AtomicInteger(0) // ResourceProfile id -> total expected executors per profile, currently we don't remove
// any resource profiles - https://issues.apache.org/jira/browse/SPARK-30749
private val totalExpectedExecutorsPerResourceProfileId = new ConcurrentHashMap[Int, Int]()
private val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]
private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
@ -73,8 +80,8 @@ private[spark] class ExecutorPodsAllocator(
s"namespace $namespace (this was supposed to be the driver pod.)."))) s"namespace $namespace (this was supposed to be the driver pod.).")))
// Executor IDs that have been requested from Kubernetes but have not been detected in any // Executor IDs that have been requested from Kubernetes but have not been detected in any
// snapshot yet. Mapped to the timestamp when they were created. // snapshot yet. Mapped to the (ResourceProfile id, timestamp) when they were created.
private val newlyCreatedExecutors = mutable.LinkedHashMap.empty[Long, Long] private val newlyCreatedExecutors = mutable.LinkedHashMap.empty[Long, (Int, Long)]
private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf) private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf)
@ -93,9 +100,12 @@ private[spark] class ExecutorPodsAllocator(
} }
} }
def setTotalExpectedExecutors(total: Int): Unit = { def setTotalExpectedExecutors(resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = {
logDebug(s"Set totalExpectedExecutors to $total") resourceProfileToTotalExecs.foreach { case (rp, numExecs) =>
totalExpectedExecutors.set(total) rpIdToResourceProfile.getOrElseUpdate(rp.id, rp)
totalExpectedExecutorsPerResourceProfileId.put(rp.id, numExecs)
}
logDebug(s"Set total expected execs to $totalExpectedExecutorsPerResourceProfileId")
if (!hasPendingPods.get()) { if (!hasPendingPods.get()) {
snapshotsStore.notifySubscribers() snapshotsStore.notifySubscribers()
} }
@ -114,7 +124,7 @@ private[spark] class ExecutorPodsAllocator(
// both the creation and deletion events. In either case, delete the missing pod // both the creation and deletion events. In either case, delete the missing pod
// if possible, and mark such a pod to be rescheduled below. // if possible, and mark such a pod to be rescheduled below.
val currentTime = clock.getTimeMillis() val currentTime = clock.getTimeMillis()
val timedOut = newlyCreatedExecutors.flatMap { case (execId, timeCreated) => val timedOut = newlyCreatedExecutors.flatMap { case (execId, (_, timeCreated)) =>
if (currentTime - timeCreated > podCreationTimeout) { if (currentTime - timeCreated > podCreationTimeout) {
Some(execId) Some(execId)
} else { } else {
@ -147,136 +157,171 @@ private[spark] class ExecutorPodsAllocator(
lastSnapshot = snapshots.last lastSnapshot = snapshots.last
} }
val currentRunningCount = lastSnapshot.executorPods.values.count {
case PodRunning(_) => true
case _ => false
}
val currentPendingExecutors = lastSnapshot.executorPods
.filter {
case (_, PodPending(_)) => true
case _ => false
}
// Make a local, non-volatile copy of the reference since it's used multiple times. This // Make a local, non-volatile copy of the reference since it's used multiple times. This
// is the only method that modifies the list, so this is safe. // is the only method that modifies the list, so this is safe.
var _deletedExecutorIds = deletedExecutorIds var _deletedExecutorIds = deletedExecutorIds
if (snapshots.nonEmpty) { if (snapshots.nonEmpty) {
logDebug(s"Pod allocation status: $currentRunningCount running, " +
s"${currentPendingExecutors.size} pending, " +
s"${newlyCreatedExecutors.size} unacknowledged.")
val existingExecs = lastSnapshot.executorPods.keySet val existingExecs = lastSnapshot.executorPods.keySet
_deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains) _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
} }
val currentTotalExpectedExecutors = totalExpectedExecutors.get // Map the pods into per ResourceProfile id so we can check per ResourceProfile,
// add a fast path if not using other ResourceProfiles.
// This variable is used later to print some debug logs. It's updated when cleaning up val rpIdToExecsAndPodState =
// excess pod requests, since currentPendingExecutors is immutable. mutable.HashMap[Int, mutable.HashMap[Long, ExecutorPodState]]()
var knownPendingCount = currentPendingExecutors.size if (totalExpectedExecutorsPerResourceProfileId.size <= 1) {
rpIdToExecsAndPodState(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) =
// It's possible that we have outstanding pods that are outdated when dynamic allocation mutable.HashMap.empty ++= lastSnapshot.executorPods
// decides to downscale the application. So check if we can release any pending pods early } else {
// instead of waiting for them to time out. Drop them first from the unacknowledged list, lastSnapshot.executorPods.foreach { case (execId, execPodState) =>
// then from the pending. However, in order to prevent too frequent frunctuation, newly val rpId = execPodState.pod.getMetadata.getLabels.get(SPARK_RESOURCE_PROFILE_ID_LABEL).toInt
// requested pods are protected during executorIdleTimeout period. val execPods = rpIdToExecsAndPodState.getOrElseUpdate(rpId,
// mutable.HashMap[Long, ExecutorPodState]())
// TODO: with dynamic allocation off, handle edge cases if we end up with more running execPods(execId) = execPodState
// executors than expected.
val knownPodCount = currentRunningCount + currentPendingExecutors.size +
newlyCreatedExecutors.size
if (knownPodCount > currentTotalExpectedExecutors) {
val excess = knownPodCount - currentTotalExpectedExecutors
val knownPendingToDelete = currentPendingExecutors
.filter(x => isExecutorIdleTimedOut(x._2, currentTime))
.map { case (id, _) => id }
.take(excess - newlyCreatedExecutors.size)
val toDelete = newlyCreatedExecutors
.filter(x => currentTime - x._2 > executorIdleTimeout)
.keys.take(excess).toList ++ knownPendingToDelete
if (toDelete.nonEmpty) {
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
_deletedExecutorIds = _deletedExecutorIds ++ toDelete
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
.delete()
newlyCreatedExecutors --= toDelete
knownPendingCount -= knownPendingToDelete.size
}
} }
} }
if (newlyCreatedExecutors.isEmpty var totalPendingCount = 0
&& knownPodCount < currentTotalExpectedExecutors) { // The order we request executors for each ResourceProfile is not guaranteed.
val numExecutorsToAllocate = math.min( totalExpectedExecutorsPerResourceProfileId.asScala.foreach { case (rpId, targetNum) =>
currentTotalExpectedExecutors - knownPodCount, podAllocationSize) val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.")
for ( _ <- 0 until numExecutorsToAllocate) { val currentRunningCount = podsForRpId.values.count {
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() case PodRunning(_) => true
val executorConf = KubernetesConf.createExecutorConf( case _ => false
conf, }
newExecutorId.toString,
applicationId, val currentPendingExecutors = podsForRpId.filter {
driverPod) case (_, PodPending(_)) => true
val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr, case _ => false
kubernetesClient) }
val executorPod = resolvedExecutorSpec.pod // This variable is used later to print some debug logs. It's updated when cleaning up
val podWithAttachedContainer = new PodBuilder(executorPod.pod) // excess pod requests, since currentPendingExecutors is immutable.
.editOrNewSpec() var knownPendingCount = currentPendingExecutors.size
.addToContainers(executorPod.container)
.endSpec() val newlyCreatedExecutorsForRpId =
.build() newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) =>
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer) rpId == waitingRpId
try { }
val resources = resolvedExecutorSpec.executorKubernetesResources
addOwnerReference(createdExecutorPod, resources) if (podsForRpId.nonEmpty) {
resources logDebug(s"ResourceProfile Id: $rpId " +
.filter(_.getKind == "PersistentVolumeClaim") s"pod allocation status: $currentRunningCount running, " +
.foreach { resource => s"${currentPendingExecutors.size} pending. " +
val pvc = resource.asInstanceOf[PersistentVolumeClaim] s"${newlyCreatedExecutorsForRpId.size} unacknowledged.")
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " + }
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().create(pvc) // It's possible that we have outstanding pods that are outdated when dynamic allocation
} // decides to downscale the application. So check if we can release any pending pods early
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() // instead of waiting for them to time out. Drop them first from the unacknowledged list,
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") // then from the pending. However, in order to prevent too frequent fluctuation, newly
} catch { // requested pods are protected during executorIdleTimeout period.
case NonFatal(e) => //
kubernetesClient.pods().delete(createdExecutorPod) // TODO: with dynamic allocation off, handle edge cases if we end up with more running
throw e // executors than expected.
val knownPodCount = currentRunningCount + currentPendingExecutors.size +
newlyCreatedExecutorsForRpId.size
if (knownPodCount > targetNum) {
val excess = knownPodCount - targetNum
val knownPendingToDelete = currentPendingExecutors
.filter(x => isExecutorIdleTimedOut(x._2, currentTime))
.map { case (id, _) => id }
.take(excess - newlyCreatedExecutorsForRpId.size)
val toDelete = newlyCreatedExecutorsForRpId
.filter { case (_, (_, createTime)) =>
currentTime - createTime > executorIdleTimeout
}.keys.take(excess).toList ++ knownPendingToDelete
if (toDelete.nonEmpty) {
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
_deletedExecutorIds = _deletedExecutorIds ++ toDelete
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
.delete()
newlyCreatedExecutors --= toDelete
knownPendingCount -= knownPendingToDelete.size
}
}
}
if (newlyCreatedExecutorsForRpId.isEmpty
&& knownPodCount < targetNum) {
requestNewExecutors(targetNum, knownPodCount, applicationId, rpId)
}
totalPendingCount += knownPendingCount
// The code below just prints debug messages, which are only useful when there's a change
// in the snapshot state. Since the messages are a little spammy, avoid them when we know
// there are no useful updates.
if (log.isDebugEnabled && snapshots.nonEmpty) {
val outstanding = knownPendingCount + newlyCreatedExecutorsForRpId.size
if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) {
logDebug(s"Current number of running executors for ResourceProfile Id $rpId is " +
"equal to the number of requested executors. Not scaling up further.")
} else {
if (outstanding > 0) {
logDebug(s"Still waiting for $outstanding executors for ResourceProfile " +
s"Id $rpId before requesting more.")
}
} }
} }
} }
deletedExecutorIds = _deletedExecutorIds deletedExecutorIds = _deletedExecutorIds
// Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this // Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
// update method when not needed. // update method when not needed.
hasPendingPods.set(knownPendingCount + newlyCreatedExecutors.size > 0) hasPendingPods.set(totalPendingCount + newlyCreatedExecutors.size > 0)
}
// The code below just prints debug messages, which are only useful when there's a change private def requestNewExecutors(
// in the snapshot state. Since the messages are a little spammy, avoid them when we know expected: Int,
// there are no useful updates. running: Int,
if (!log.isDebugEnabled || snapshots.isEmpty) { applicationId: String,
return resourceProfileId: Int): Unit = {
} val numExecutorsToAllocate = math.min(expected - running, podAllocationSize)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +
if (currentRunningCount >= currentTotalExpectedExecutors && !dynamicAllocationEnabled) { s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.")
logDebug("Current number of running executors is equal to the number of requested" + for ( _ <- 0 until numExecutorsToAllocate) {
" executors. Not scaling up further.") val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
} else { val executorConf = KubernetesConf.createExecutorConf(
val outstanding = knownPendingCount + newlyCreatedExecutors.size conf,
if (outstanding > 0) { newExecutorId.toString,
logDebug(s"Still waiting for $outstanding executors before requesting more.") applicationId,
driverPod,
resourceProfileId)
val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr,
kubernetesClient, rpIdToResourceProfile(resourceProfileId))
val executorPod = resolvedExecutorSpec.pod
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
.endSpec()
.build()
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
try {
val resources = resolvedExecutorSpec.executorKubernetesResources
addOwnerReference(createdExecutorPod, resources)
resources
.filter(_.getKind == "PersistentVolumeClaim")
.foreach { resource =>
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().create(pvc)
}
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdExecutorPod)
throw e
} }
} }
} }

View file

@ -78,7 +78,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def start(): Unit = { override def start(): Unit = {
super.start() super.start()
podAllocator.setTotalExpectedExecutors(initialExecutors) val initExecs = Map(defaultProfile -> initialExecutors)
podAllocator.setTotalExpectedExecutors(initExecs)
lifecycleEventHandler.start(this) lifecycleEventHandler.start(this)
podAllocator.start(applicationId()) podAllocator.start(applicationId())
watchEvents.start(applicationId()) watchEvents.start(applicationId())
@ -121,7 +122,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def doRequestTotalExecutors( override def doRequestTotalExecutors(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = { resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
podAllocator.setTotalExpectedExecutors(resourceProfileToTotalExecs(defaultProfile)) podAllocator.setTotalExpectedExecutors(resourceProfileToTotalExecs)
Future.successful(true) Future.successful(true)
} }

View file

@ -23,13 +23,15 @@ import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.SecurityManager import org.apache.spark.SecurityManager
import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features._
import org.apache.spark.resource.ResourceProfile
private[spark] class KubernetesExecutorBuilder { private[spark] class KubernetesExecutorBuilder {
def buildFromFeatures( def buildFromFeatures(
conf: KubernetesExecutorConf, conf: KubernetesExecutorConf,
secMgr: SecurityManager, secMgr: SecurityManager,
client: KubernetesClient): KubernetesExecutorSpec = { client: KubernetesClient,
resourceProfile: ResourceProfile): KubernetesExecutorSpec = {
val initialPod = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) val initialPod = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
.map { file => .map { file =>
KubernetesUtils.loadPodFromTemplate( KubernetesUtils.loadPodFromTemplate(
@ -40,7 +42,7 @@ private[spark] class KubernetesExecutorBuilder {
.getOrElse(SparkPod.initialPod()) .getOrElse(SparkPod.initialPod())
val features = Seq( val features = Seq(
new BasicExecutorFeatureStep(conf, secMgr), new BasicExecutorFeatureStep(conf, secMgr, resourceProfile),
new ExecutorKubernetesCredentialsFeatureStep(conf), new ExecutorKubernetesCredentialsFeatureStep(conf),
new MountSecretsFeatureStep(conf), new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf),
@ -51,6 +53,8 @@ private[spark] class KubernetesExecutorBuilder {
initialPod, initialPod,
executorKubernetesResources = Seq.empty) executorKubernetesResources = Seq.empty)
// If using a template this will always get the resources from that and combine
// them with any Spark conf or ResourceProfile resources.
features.foldLeft(spec) { case (spec, feature) => features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod) val configuredPod = feature.configurePod(spec.pod)
val addedResources = feature.getAdditionalKubernetesResources() val addedResources = feature.getAdditionalKubernetesResources()

View file

@ -23,6 +23,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
class KubernetesConfSuite extends SparkFunSuite { class KubernetesConfSuite extends SparkFunSuite {
@ -96,6 +97,17 @@ class KubernetesConfSuite extends SparkFunSuite {
Some(DRIVER_POD)) Some(DRIVER_POD))
assert(conf.executorId === EXECUTOR_ID) assert(conf.executorId === EXECUTOR_ID)
assert(conf.driverPod.get === DRIVER_POD) assert(conf.driverPod.get === DRIVER_POD)
assert(conf.resourceProfileId === DEFAULT_RESOURCE_PROFILE_ID)
}
test("resource profile not default.") {
val conf = KubernetesConf.createExecutorConf(
new SparkConf(false),
EXECUTOR_ID,
KubernetesTestConf.APP_ID,
Some(DRIVER_POD),
10)
assert(conf.resourceProfileId === 10)
} }
test("Image pull secrets.") { test("Image pull secrets.") {
@ -134,7 +146,8 @@ class KubernetesConfSuite extends SparkFunSuite {
assert(conf.labels === Map( assert(conf.labels === Map(
SPARK_EXECUTOR_ID_LABEL -> EXECUTOR_ID, SPARK_EXECUTOR_ID_LABEL -> EXECUTOR_ID,
SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID,
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ CUSTOM_LABELS) SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE,
SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) ++ CUSTOM_LABELS)
assert(conf.annotations === CUSTOM_ANNOTATIONS) assert(conf.annotations === CUSTOM_ANNOTATIONS)
assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS) assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS)

View file

@ -28,7 +28,7 @@ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestReso
import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.UI._
import org.apache.spark.resource.ResourceID import org.apache.spark.resource.{ResourceID, ResourceProfile}
import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
@ -191,7 +191,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
).foreach { case (name, resource, factor, expectedFactor) => ).foreach { case (name, resource, factor, expectedFactor) =>
test(s"memory overhead factor: $name") { test(s"memory overhead factor: $name") {
// Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
val driverMem = MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 val driverMem =
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
// main app resource, overhead factor // main app resource, overhead factor
val sparkConf = new SparkConf(false) val sparkConf = new SparkConf(false)

View file

@ -34,7 +34,7 @@ import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestReso
import org.apache.spark.internal.config import org.apache.spark.internal.config
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._ import org.apache.spark.internal.config.Python._
import org.apache.spark.resource.ResourceID import org.apache.spark.resource._
import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.rpc.RpcEndpointAddress
@ -55,6 +55,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
private val RESOURCE_NAME_PREFIX = "base" private val RESOURCE_NAME_PREFIX = "base"
private val EXECUTOR_IMAGE = "executor-image" private val EXECUTOR_IMAGE = "executor-image"
private val LABELS = Map("label1key" -> "label1value") private val LABELS = Map("label1key" -> "label1value")
private var defaultProfile: ResourceProfile = _
private val TEST_IMAGE_PULL_SECRETS = Seq("my-1secret-1", "my-secret-2") private val TEST_IMAGE_PULL_SECRETS = Seq("my-1secret-1", "my-secret-2")
private val TEST_IMAGE_PULL_SECRET_OBJECTS = private val TEST_IMAGE_PULL_SECRET_OBJECTS =
TEST_IMAGE_PULL_SECRETS.map { secret => TEST_IMAGE_PULL_SECRETS.map { secret =>
@ -84,6 +85,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
.set(config.DRIVER_PORT, DRIVER_PORT) .set(config.DRIVER_PORT, DRIVER_PORT)
.set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS) .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
.set("spark.kubernetes.resource.type", "java") .set("spark.kubernetes.resource.type", "java")
initDefaultProfile(baseConf)
} }
private def newExecutorConf( private def newExecutorConf(
@ -95,10 +97,17 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
environment = environment) environment = environment)
} }
private def initDefaultProfile(baseConf: SparkConf): Unit = {
ResourceProfile.clearDefaultProfile()
defaultProfile = ResourceProfile.getOrCreateDefaultProfile(baseConf)
}
test("test spark resource missing vendor") { test("test spark resource missing vendor") {
baseConf.set(EXECUTOR_GPU_ID.amountConf, "2") baseConf.set(EXECUTOR_GPU_ID.amountConf, "2")
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
val error = intercept[SparkException] { val error = intercept[SparkException] {
initDefaultProfile(baseConf)
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
defaultProfile)
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
}.getMessage() }.getMessage()
assert(error.contains("Resource: gpu was requested, but vendor was not specified")) assert(error.contains("Resource: gpu was requested, but vendor was not specified"))
@ -106,9 +115,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
test("test spark resource missing amount") { test("test spark resource missing amount") {
baseConf.set(EXECUTOR_GPU_ID.vendorConf, "nvidia.com") baseConf.set(EXECUTOR_GPU_ID.vendorConf, "nvidia.com")
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
val error = intercept[SparkException] { val error = intercept[SparkException] {
initDefaultProfile(baseConf)
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
defaultProfile)
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
}.getMessage() }.getMessage()
assert(error.contains("You must specify an amount for gpu")) assert(error.contains("You must specify an amount for gpu"))
@ -124,7 +134,9 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
baseConf.set(testRInfo.rId.amountConf, testRInfo.count) baseConf.set(testRInfo.rId.amountConf, testRInfo.count)
baseConf.set(testRInfo.rId.vendorConf, testRInfo.vendor) baseConf.set(testRInfo.rId.vendorConf, testRInfo.vendor)
} }
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) initDefaultProfile(baseConf)
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
defaultProfile)
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
assert(executor.container.getResources.getLimits.size() === 3) assert(executor.container.getResources.getLimits.size() === 3)
@ -137,7 +149,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
} }
test("basic executor pod has reasonable defaults") { test("basic executor pod has reasonable defaults") {
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
defaultProfile)
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
// The executor pod name and default labels. // The executor pod name and default labels.
@ -167,7 +180,9 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
val longPodNamePrefix = "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple" val longPodNamePrefix = "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple"
baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, longPodNamePrefix) baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, longPodNamePrefix)
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) initDefaultProfile(baseConf)
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
defaultProfile)
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
} }
@ -175,7 +190,9 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
val invalidPrefix = "abcdef-*_/[]{}+==.,;'\"-----------------------------------------------" val invalidPrefix = "abcdef-*_/[]{}+==.,;'\"-----------------------------------------------"
baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalidPrefix) baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalidPrefix)
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) initDefaultProfile(baseConf)
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
defaultProfile)
val hostname = step.configurePod(SparkPod.initialPod()).pod.getSpec().getHostname() val hostname = step.configurePod(SparkPod.initialPod()).pod.getSpec().getHostname()
assert(hostname.length <= 63) assert(hostname.length <= 63)
assert(InternetDomainName.isValid(hostname)) assert(InternetDomainName.isValid(hostname))
@ -184,8 +201,10 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
test("classpath and extra java options get translated into environment variables") { test("classpath and extra java options get translated into environment variables") {
baseConf.set(config.EXECUTOR_JAVA_OPTIONS, "foo=bar") baseConf.set(config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
baseConf.set(config.EXECUTOR_CLASS_PATH, "bar=baz") baseConf.set(config.EXECUTOR_CLASS_PATH, "bar=baz")
initDefaultProfile(baseConf)
val kconf = newExecutorConf(environment = Map("qux" -> "quux")) val kconf = newExecutorConf(environment = Map("qux" -> "quux"))
val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf)) val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf),
defaultProfile)
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
checkEnv(executor, baseConf, checkEnv(executor, baseConf,
@ -198,7 +217,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
test("SPARK-32655 Support appId/execId placeholder in SPARK_EXECUTOR_DIRS") { test("SPARK-32655 Support appId/execId placeholder in SPARK_EXECUTOR_DIRS") {
val kconf = newExecutorConf(environment = Map(ENV_EXECUTOR_DIRS -> val kconf = newExecutorConf(environment = Map(ENV_EXECUTOR_DIRS ->
"/p1/SPARK_APPLICATION_ID/SPARK_EXECUTOR_ID,/p2/SPARK_APPLICATION_ID/SPARK_EXECUTOR_ID")) "/p1/SPARK_APPLICATION_ID/SPARK_EXECUTOR_ID,/p2/SPARK_APPLICATION_ID/SPARK_EXECUTOR_ID"))
val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf)) val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf),
defaultProfile)
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
checkEnv(executor, baseConf, Map(ENV_EXECUTOR_DIRS -> checkEnv(executor, baseConf, Map(ENV_EXECUTOR_DIRS ->
@ -208,8 +228,9 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
test("test executor pyspark memory") { test("test executor pyspark memory") {
baseConf.set("spark.kubernetes.resource.type", "python") baseConf.set("spark.kubernetes.resource.type", "python")
baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L) baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L)
initDefaultProfile(baseConf)
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
defaultProfile)
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
// This is checking that basic executor + executorMemory = 1408 + 42 = 1450 // This is checking that basic executor + executorMemory = 1408 + 42 = 1450
assert(amountAndFormat(executor.container.getResources.getRequests.get("memory")) === "1450Mi") assert(amountAndFormat(executor.container.getResources.getRequests.get("memory")) === "1450Mi")
@ -224,7 +245,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
secMgr.initializeAuth() secMgr.initializeAuth()
val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf), val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf),
secMgr) secMgr, defaultProfile)
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
checkEnv(executor, conf, Map(SecurityManager.ENV_AUTH_SECRET -> secMgr.getSecretKey())) checkEnv(executor, conf, Map(SecurityManager.ENV_AUTH_SECRET -> secMgr.getSecretKey()))
@ -240,15 +261,65 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
.set("spark.master", "k8s://127.0.0.1") .set("spark.master", "k8s://127.0.0.1")
val secMgr = new SecurityManager(conf) val secMgr = new SecurityManager(conf)
secMgr.initializeAuth() secMgr.initializeAuth()
val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf), val step = new BasicExecutorFeatureStep(KubernetesTestConf.createExecutorConf(sparkConf = conf),
secMgr) secMgr, defaultProfile)
val executor = step.configurePod(SparkPod.initialPod()) val executor = step.configurePod(SparkPod.initialPod())
assert(!KubernetesFeaturesTestUtils.containerHasEnvVar( assert(!KubernetesFeaturesTestUtils.containerHasEnvVar(
executor.container, SecurityManager.ENV_AUTH_SECRET)) executor.container, SecurityManager.ENV_AUTH_SECRET))
} }
test("SPARK-32661 test executor offheap memory") {
baseConf.set(MEMORY_OFFHEAP_ENABLED, true)
baseConf.set("spark.memory.offHeap.size", "42m")
initDefaultProfile(baseConf)
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf),
defaultProfile)
val executor = step.configurePod(SparkPod.initialPod())
// This is checking that basic executor + executorMemory = 1408 + 42 = 1450
assert(amountAndFormat(executor.container.getResources.getRequests.get("memory")) === "1450Mi")
}
test("basic resourceprofile") {
baseConf.set("spark.kubernetes.resource.type", "python")
initDefaultProfile(baseConf)
val rpb = new ResourceProfileBuilder()
val ereq = new ExecutorResourceRequests()
val treq = new TaskResourceRequests()
ereq.cores(4).memory("2g").memoryOverhead("1g").pysparkMemory("3g")
treq.cpus(2)
rpb.require(ereq).require(treq)
val rp = rpb.build
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp)
val executor = step.configurePod(SparkPod.initialPod())
assert(amountAndFormat(executor.container.getResources
.getRequests.get("cpu")) === "4")
assert(amountAndFormat(executor.container.getResources
.getLimits.get("memory")) === "6144Mi")
}
test("resourceprofile with gpus") {
val rpb = new ResourceProfileBuilder()
val ereq = new ExecutorResourceRequests()
val treq = new TaskResourceRequests()
ereq.cores(2).resource("gpu", 2, "/path/getGpusResources.sh", "nvidia.com")
treq.cpus(1)
rpb.require(ereq).require(treq)
val rp = rpb.build
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf), rp)
val executor = step.configurePod(SparkPod.initialPod())
assert(amountAndFormat(executor.container.getResources
.getLimits.get("memory")) === "1408Mi")
assert(amountAndFormat(executor.container.getResources
.getRequests.get("cpu")) === "2")
assert(executor.container.getResources.getLimits.size() === 2)
assert(amountAndFormat(executor.container.getResources.getLimits.get("nvidia.com/gpu")) === "2")
}
// There is always exactly one controller reference, and it points to the driver pod. // There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1) assert(executor.getMetadata.getOwnerReferences.size() === 1)
@ -265,11 +336,12 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
ENV_EXECUTOR_ID -> "1", ENV_EXECUTOR_ID -> "1",
ENV_DRIVER_URL -> DRIVER_ADDRESS.toString, ENV_DRIVER_URL -> DRIVER_ADDRESS.toString,
ENV_EXECUTOR_CORES -> "1", ENV_EXECUTOR_CORES -> "1",
ENV_EXECUTOR_MEMORY -> "1g", ENV_EXECUTOR_MEMORY -> "1024m",
ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID, ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID,
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL, ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_EXECUTOR_POD_IP -> null, ENV_EXECUTOR_POD_IP -> null,
ENV_SPARK_USER -> Utils.getCurrentUserName()) ENV_SPARK_USER -> Utils.getCurrentUserName(),
ENV_RESOURCE_PROFILE_ID -> "0")
val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX)) val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX))
val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)

View file

@ -22,13 +22,15 @@ import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder}
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.SparkPod import org.apache.spark.deploy.k8s.SparkPod
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
object ExecutorLifecycleTestUtils { object ExecutorLifecycleTestUtils {
val TEST_SPARK_APP_ID = "spark-app-id" val TEST_SPARK_APP_ID = "spark-app-id"
def failedExecutorWithoutDeletion(executorId: Long): Pod = { def failedExecutorWithoutDeletion(
new PodBuilder(podWithAttachedContainerForId(executorId)) executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
.editOrNewStatus() .editOrNewStatus()
.withPhase("failed") .withPhase("failed")
.withStartTime(Instant.now.toString) .withStartTime(Instant.now.toString)
@ -58,8 +60,8 @@ object ExecutorLifecycleTestUtils {
.build() .build()
} }
def pendingExecutor(executorId: Long): Pod = { def pendingExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId)) new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
.editOrNewStatus() .editOrNewStatus()
.withPhase("pending") .withPhase("pending")
.withStartTime(Instant.now.toString) .withStartTime(Instant.now.toString)
@ -67,8 +69,8 @@ object ExecutorLifecycleTestUtils {
.build() .build()
} }
def runningExecutor(executorId: Long): Pod = { def runningExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId)) new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
.editOrNewStatus() .editOrNewStatus()
.withPhase("running") .withPhase("running")
.withStartTime(Instant.now.toString) .withStartTime(Instant.now.toString)
@ -82,8 +84,9 @@ object ExecutorLifecycleTestUtils {
* state (terminated with non-zero exit code). This pod is used for unit-testing the * state (terminated with non-zero exit code). This pod is used for unit-testing the
* spark.kubernetes.executor.checkAllContainers Spark Conf. * spark.kubernetes.executor.checkAllContainers Spark Conf.
*/ */
def runningExecutorWithFailedContainer(executorId: Long): Pod = { def runningExecutorWithFailedContainer(
new PodBuilder(podWithAttachedContainerForId(executorId)) executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
.editOrNewStatus() .editOrNewStatus()
.withPhase("running") .withPhase("running")
.addNewContainerStatus() .addNewContainerStatus()
@ -103,32 +106,34 @@ object ExecutorLifecycleTestUtils {
.build() .build()
} }
def succeededExecutor(executorId: Long): Pod = { def succeededExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId)) new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
.editOrNewStatus() .editOrNewStatus()
.withPhase("succeeded") .withPhase("succeeded")
.endStatus() .endStatus()
.build() .build()
} }
def deletedExecutor(executorId: Long): Pod = { def deletedExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId)) new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
.editOrNewMetadata() .editOrNewMetadata()
.withDeletionTimestamp("523012521") .withDeletionTimestamp("523012521")
.endMetadata() .endMetadata()
.build() .build()
} }
def unknownExecutor(executorId: Long): Pod = { def unknownExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId)) new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
.editOrNewStatus() .editOrNewStatus()
.withPhase("unknown") .withPhase("unknown")
.endStatus() .endStatus()
.build() .build()
} }
def podWithAttachedContainerForId(executorId: Long): Pod = { def podWithAttachedContainerForId(
val sparkPod = executorPodWithId(executorId) executorId: Long,
rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
val sparkPod = executorPodWithId(executorId, rpId)
val podWithAttachedContainer = new PodBuilder(sparkPod.pod) val podWithAttachedContainer = new PodBuilder(sparkPod.pod)
.editOrNewSpec() .editOrNewSpec()
.addToContainers(sparkPod.container) .addToContainers(sparkPod.container)
@ -137,13 +142,14 @@ object ExecutorLifecycleTestUtils {
podWithAttachedContainer podWithAttachedContainer
} }
def executorPodWithId(executorId: Long): SparkPod = { def executorPodWithId(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): SparkPod = {
val pod = new PodBuilder() val pod = new PodBuilder()
.withNewMetadata() .withNewMetadata()
.withName(s"spark-executor-$executorId") .withName(s"spark-executor-$executorId")
.addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID) .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
.addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString) .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
.addToLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, rpId.toString)
.endMetadata() .endMetadata()
.editOrNewSpec() .editOrNewSpec()
.withRestartPolicy("Never") .withRestartPolicy("Never")

View file

@ -34,6 +34,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
import org.apache.spark.resource._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
import org.apache.spark.util.ManualClock import org.apache.spark.util.ManualClock
@ -54,6 +55,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
.set(KUBERNETES_DRIVER_POD_NAME, driverPodName) .set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s") .set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s")
private val defaultProfile: ResourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)
private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000 private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000
@ -89,7 +91,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
when(driverPodOperations.get).thenReturn(driverPod) when(driverPodOperations.get).thenReturn(driverPod)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
meq(kubernetesClient))).thenAnswer(executorPodAnswer()) meq(kubernetesClient), any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer())
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
waitForExecutorPodsClock = new ManualClock(0L) waitForExecutorPodsClock = new ManualClock(0L)
podsAllocatorUnderTest = new ExecutorPodsAllocator( podsAllocatorUnderTest = new ExecutorPodsAllocator(
@ -99,7 +101,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Initially request executors in batches. Do not request another batch if the" + test("Initially request executors in batches. Do not request another batch if the" +
" first has not finished.") { " first has not finished.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))
for (nextId <- 1 to podAllocationSize) { for (nextId <- 1 to podAllocationSize) {
verify(podOperations).create(podWithAttachedContainerForId(nextId)) verify(podOperations).create(podWithAttachedContainerForId(nextId))
} }
@ -108,7 +110,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Request executors in batches. Allow another batch to be requested if" + test("Request executors in batches. Allow another batch to be requested if" +
" all pending executors start running.") { " all pending executors start running.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))
for (execId <- 1 until podAllocationSize) { for (execId <- 1 until podAllocationSize) {
snapshotsStore.updatePod(runningExecutor(execId)) snapshotsStore.updatePod(runningExecutor(execId))
} }
@ -124,7 +126,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
test("When a current batch reaches error states immediately, re-request" + test("When a current batch reaches error states immediately, re-request" +
" them on the next batch.") { " them on the next batch.") {
podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> podAllocationSize))
for (execId <- 1 until podAllocationSize) { for (execId <- 1 until podAllocationSize) {
snapshotsStore.updatePod(runningExecutor(execId)) snapshotsStore.updatePod(runningExecutor(execId))
} }
@ -145,7 +147,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(podOperations when(podOperations
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1"))
.thenReturn(labeledPods) .thenReturn(labeledPods)
podsAllocatorUnderTest.setTotalExpectedExecutors(1) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
verify(podOperations).create(podWithAttachedContainerForId(1)) verify(podOperations).create(podWithAttachedContainerForId(1))
waitForExecutorPodsClock.setTime(podCreationTimeout + 1) waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
snapshotsStore.notifySubscribers() snapshotsStore.notifySubscribers()
@ -171,7 +173,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
waitForExecutorPodsClock.setTime(startTime) waitForExecutorPodsClock.setTime(startTime)
// Target 1 executor, make sure it's requested, even with an empty initial snapshot. // Target 1 executor, make sure it's requested, even with an empty initial snapshot.
podsAllocatorUnderTest.setTotalExpectedExecutors(1) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
verify(podOperations).create(podWithAttachedContainerForId(1)) verify(podOperations).create(podWithAttachedContainerForId(1))
// Mark executor as running, verify that subsequent allocation cycle is a no-op. // Mark executor as running, verify that subsequent allocation cycle is a no-op.
@ -181,7 +183,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
verify(podOperations, never()).delete() verify(podOperations, never()).delete()
// Request 3 more executors, make sure all are requested. // Request 3 more executors, make sure all are requested.
podsAllocatorUnderTest.setTotalExpectedExecutors(4) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4))
snapshotsStore.notifySubscribers() snapshotsStore.notifySubscribers()
verify(podOperations).create(podWithAttachedContainerForId(2)) verify(podOperations).create(podWithAttachedContainerForId(2))
verify(podOperations).create(podWithAttachedContainerForId(3)) verify(podOperations).create(podWithAttachedContainerForId(3))
@ -196,7 +198,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
// Scale down to 1. Pending executors (both acknowledged and not) should be deleted. // Scale down to 1. Pending executors (both acknowledged and not) should be deleted.
waitForExecutorPodsClock.advance(executorIdleTimeout * 2) waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
podsAllocatorUnderTest.setTotalExpectedExecutors(1) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
snapshotsStore.notifySubscribers() snapshotsStore.notifySubscribers()
verify(podOperations, times(4)).create(any()) verify(podOperations, times(4)).create(any())
verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4") verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
@ -231,7 +233,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
val startTime = Instant.now.toEpochMilli val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime) waitForExecutorPodsClock.setTime(startTime)
podsAllocatorUnderTest.setTotalExpectedExecutors(5) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 5))
verify(podOperations).create(podWithAttachedContainerForId(1)) verify(podOperations).create(podWithAttachedContainerForId(1))
verify(podOperations).create(podWithAttachedContainerForId(2)) verify(podOperations).create(podWithAttachedContainerForId(2))
verify(podOperations).create(podWithAttachedContainerForId(3)) verify(podOperations).create(podWithAttachedContainerForId(3))
@ -243,7 +245,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
snapshotsStore.updatePod(pendingExecutor(2)) snapshotsStore.updatePod(pendingExecutor(2))
// Newly created executors (both acknowledged and not) are protected by executorIdleTimeout // Newly created executors (both acknowledged and not) are protected by executorIdleTimeout
podsAllocatorUnderTest.setTotalExpectedExecutors(0) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 0))
snapshotsStore.notifySubscribers() snapshotsStore.notifySubscribers()
verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5") verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5")
verify(podOperations, never()).delete() verify(podOperations, never()).delete()
@ -255,6 +257,88 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
verify(podOperations).delete() verify(podOperations).delete()
} }
test("SPARK-33288: multiple resource profiles") {
when(podOperations
.withField("status.phase", "Pending"))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(podOperations)
when(podOperations
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(podOperations)
when(podOperations
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
.thenReturn(podOperations)
val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)
val rpb = new ResourceProfileBuilder()
val ereq = new ExecutorResourceRequests()
val treq = new TaskResourceRequests()
ereq.cores(4).memory("2g")
treq.cpus(2)
rpb.require(ereq).require(treq)
val rp = rpb.build
// Target 1 executor for default profile, 2 for other profile,
// make sure it's requested, even with an empty initial snapshot.
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 2))
verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id))
verify(podOperations).create(podWithAttachedContainerForId(2, rp.id))
verify(podOperations).create(podWithAttachedContainerForId(3, rp.id))
// Mark executor as running, verify that subsequent allocation cycle is a no-op.
snapshotsStore.updatePod(runningExecutor(1, defaultProfile.id))
snapshotsStore.updatePod(runningExecutor(2, rp.id))
snapshotsStore.updatePod(runningExecutor(3, rp.id))
snapshotsStore.notifySubscribers()
verify(podOperations, times(3)).create(any())
verify(podOperations, never()).delete()
// Request 3 more executors for default profile and 1 more for other profile,
// make sure all are requested.
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4, rp -> 3))
snapshotsStore.notifySubscribers()
verify(podOperations).create(podWithAttachedContainerForId(4, defaultProfile.id))
verify(podOperations).create(podWithAttachedContainerForId(5, defaultProfile.id))
verify(podOperations).create(podWithAttachedContainerForId(6, defaultProfile.id))
verify(podOperations).create(podWithAttachedContainerForId(7, rp.id))
// Mark 4 as running, 5 and 7 as pending. Allocation cycle should do nothing.
snapshotsStore.updatePod(runningExecutor(4, defaultProfile.id))
snapshotsStore.updatePod(pendingExecutor(5, defaultProfile.id))
snapshotsStore.updatePod(pendingExecutor(7, rp.id))
snapshotsStore.notifySubscribers()
verify(podOperations, times(7)).create(any())
verify(podOperations, never()).delete()
// Scale down to 1 for both resource profiles. Pending executors
// (both acknowledged and not) should be deleted.
waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1))
snapshotsStore.notifySubscribers()
verify(podOperations, times(7)).create(any())
verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6")
verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7")
verify(podOperations, times(2)).delete()
assert(podsAllocatorUnderTest.isDeleted("5"))
assert(podsAllocatorUnderTest.isDeleted("6"))
assert(podsAllocatorUnderTest.isDeleted("7"))
// Update the snapshot to not contain the deleted executors, make sure the
// allocator cleans up internal state.
snapshotsStore.updatePod(deletedExecutor(5))
snapshotsStore.updatePod(deletedExecutor(6))
snapshotsStore.updatePod(deletedExecutor(7))
snapshotsStore.removeDeletedExecutors()
snapshotsStore.notifySubscribers()
assert(!podsAllocatorUnderTest.isDeleted("5"))
assert(!podsAllocatorUnderTest.isDeleted("6"))
assert(!podsAllocatorUnderTest.isDeleted("7"))
}
test("SPARK-33262: pod allocator does not stall with pending pods") { test("SPARK-33262: pod allocator does not stall with pending pods") {
when(podOperations when(podOperations
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
@ -269,7 +353,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "3", "4", "5", "6")) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "3", "4", "5", "6"))
.thenReturn(podOperations) .thenReturn(podOperations)
podsAllocatorUnderTest.setTotalExpectedExecutors(6) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 6))
// Initial request of pods // Initial request of pods
verify(podOperations).create(podWithAttachedContainerForId(1)) verify(podOperations).create(podWithAttachedContainerForId(1))
verify(podOperations).create(podWithAttachedContainerForId(2)) verify(podOperations).create(podWithAttachedContainerForId(2))
@ -292,6 +376,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
(invocation: InvocationOnMock) => { (invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt), Seq.empty) KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt,
k8sConf.resourceProfileId.toInt), Seq.empty)
} }
} }

View file

@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.resource.ResourceProfileManager import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl} import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@ -89,6 +89,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
private val listenerBus = new LiveListenerBus(new SparkConf()) private val listenerBus = new LiveListenerBus(new SparkConf())
private val resourceProfileManager = new ResourceProfileManager(sparkConf, listenerBus) private val resourceProfileManager = new ResourceProfileManager(sparkConf, listenerBus)
private val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
before { before {
MockitoAnnotations.initMocks(this) MockitoAnnotations.initMocks(this)
@ -118,7 +119,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
test("Start all components") { test("Start all components") {
schedulerBackendUnderTest.start() schedulerBackendUnderTest.start()
verify(podAllocator).setTotalExpectedExecutors(3) verify(podAllocator).setTotalExpectedExecutors(Map(defaultProfile -> 3))
verify(podAllocator).start(TEST_SPARK_APP_ID) verify(podAllocator).start(TEST_SPARK_APP_ID)
verify(lifecycleEventHandler).start(schedulerBackendUnderTest) verify(lifecycleEventHandler).start(schedulerBackendUnderTest)
verify(watchEvents).start(TEST_SPARK_APP_ID) verify(watchEvents).start(TEST_SPARK_APP_ID)

View file

@ -21,6 +21,7 @@ import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s._
import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.resource.ResourceProfile
class KubernetesExecutorBuilderSuite extends PodBuilderSuite { class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
@ -32,7 +33,8 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
sparkConf.set("spark.driver.host", "https://driver.host.com") sparkConf.set("spark.driver.host", "https://driver.host.com")
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf) val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
val secMgr = new SecurityManager(sparkConf) val secMgr = new SecurityManager(sparkConf)
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client).pod val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, defaultProfile).pod
} }
} }

View file

@ -85,6 +85,7 @@ case "$1" in
--cores $SPARK_EXECUTOR_CORES --cores $SPARK_EXECUTOR_CORES
--app-id $SPARK_APPLICATION_ID --app-id $SPARK_APPLICATION_ID
--hostname $SPARK_EXECUTOR_POD_IP --hostname $SPARK_EXECUTOR_POD_IP
--resourceProfileId $SPARK_RESOURCE_PROFILE_ID
) )
;; ;;

View file

@ -63,6 +63,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._ import org.apache.spark.internal.config.Python._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.RpcEnv import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.{CallerContext, Utils, YarnContainerInfoHelper} import org.apache.spark.util.{CallerContext, Utils, YarnContainerInfoHelper}
@ -93,7 +94,8 @@ private[spark] class Client(
private val amMemoryOverhead = { private val amMemoryOverhead = {
val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
sparkConf.get(amMemoryOverheadEntry).getOrElse( sparkConf.get(amMemoryOverheadEntry).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong,
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
} }
private val amCores = if (isClusterMode) { private val amCores = if (isClusterMode) {
sparkConf.get(DRIVER_CORES) sparkConf.get(DRIVER_CORES)
@ -104,9 +106,10 @@ private[spark] class Client(
// Executor related configurations // Executor related configurations
private val executorMemory = sparkConf.get(EXECUTOR_MEMORY) private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
// Executor offHeap memory in MiB. // Executor offHeap memory in MiB.
protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf) protected val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(sparkConf)
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong,
ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
private val isPython = sparkConf.get(IS_PYTHON_APP) private val isPython = sparkConf.get(IS_PYTHON_APP)
private val pysparkWorkerMemory: Int = if (isPython) { private val pysparkWorkerMemory: Int = if (isPython) {

View file

@ -43,6 +43,8 @@ private object ResourceRequestHelper extends Logging {
private val RESOURCE_NOT_FOUND = "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException" private val RESOURCE_NOT_FOUND = "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException"
val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu" val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu"
val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga" val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga"
private[yarn] val resourceNameMapping =
Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG)
@volatile private var numResourceErrors: Int = 0 @volatile private var numResourceErrors: Int = 0
private[yarn] def getYarnResourcesAndAmounts( private[yarn] def getYarnResourcesAndAmounts(
@ -76,7 +78,7 @@ private object ResourceRequestHelper extends Logging {
confPrefix: String, confPrefix: String,
sparkConf: SparkConf sparkConf: SparkConf
): Map[String, String] = { ): Map[String, String] = {
Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG).map { resourceNameMapping.map {
case (rName, yarnName) => case (rName, yarnName) =>
(yarnName -> sparkConf.get(new ResourceID(confPrefix, rName).amountConf, "0")) (yarnName -> sparkConf.get(new ResourceID(confPrefix, rName).amountConf, "0"))
}.filter { case (_, count) => count.toLong > 0 } }.filter { case (_, count) => count.toLong > 0 }

View file

@ -37,7 +37,6 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._
import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
@ -162,34 +161,7 @@ private[yarn] class YarnAllocator(
private val allocatorNodeHealthTracker = private val allocatorNodeHealthTracker =
new YarnAllocatorNodeHealthTracker(sparkConf, amClient, failureTracker) new YarnAllocatorNodeHealthTracker(sparkConf, amClient, failureTracker)
// Executor memory in MiB. private val isPythonApp = sparkConf.get(IS_PYTHON_APP)
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Executor offHeap memory in MiB.
protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
protected val pysparkWorkerMemory: Int = if (sparkConf.get(IS_PYTHON_APP)) {
sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
} else {
0
}
// Number of cores per executor for the default profile
protected val defaultExecutorCores = sparkConf.get(EXECUTOR_CORES)
private val executorResourceRequests =
getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)
// Resource capability requested for each executor for the default profile
private[yarn] val defaultResource: Resource = {
val resource: Resource = Resource.newInstance(
executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory,
defaultExecutorCores)
ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
logDebug(s"Created resource capability: $resource")
resource
}
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
"ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS)) "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
@ -211,11 +183,10 @@ private[yarn] class YarnAllocator(
new HashMap[String, mutable.Set[ContainerId]]() new HashMap[String, mutable.Set[ContainerId]]()
runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID, mutable.HashSet[String]()) runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID, mutable.HashSet[String]())
numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = new AtomicInteger(0) numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = new AtomicInteger(0)
targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = val initTargetExecNum = SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf) targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = initTargetExecNum
rpIdToYarnResource.put(DEFAULT_RESOURCE_PROFILE_ID, defaultResource) val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) = createYarnResourceForResourceProfile(defaultProfile)
ResourceProfile.getOrCreateDefaultProfile(sparkConf)
} }
initDefaultProfile() initDefaultProfile()
@ -302,48 +273,55 @@ private[yarn] class YarnAllocator(
} }
// if a ResourceProfile hasn't been seen yet, create the corresponding YARN Resource for it // if a ResourceProfile hasn't been seen yet, create the corresponding YARN Resource for it
private def createYarnResourceForResourceProfile( private def createYarnResourceForResourceProfile(rp: ResourceProfile): Unit = synchronized {
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = synchronized { if (!rpIdToYarnResource.contains(rp.id)) {
resourceProfileToTotalExecs.foreach { case (rp, num) => // track the resource profile if not already there
if (!rpIdToYarnResource.contains(rp.id)) { getOrUpdateRunningExecutorForRPId(rp.id)
// Start with the application or default settings logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
var heapMem = executorMemory.toLong val resourcesWithDefaults =
var offHeapMem = executorOffHeapMemory.toLong ResourceProfile.getResourcesForClusterManager(rp.id, rp.executorResources,
var overheadMem = memoryOverhead.toLong MEMORY_OVERHEAD_FACTOR, sparkConf, isPythonApp,
var pysparkMem = pysparkWorkerMemory.toLong ResourceRequestHelper.resourceNameMapping)
var cores = defaultExecutorCores val customSparkResources =
val customResources = new mutable.HashMap[String, String] resourcesWithDefaults.customResources.map { case (name, execReq) =>
// track the resource profile if not already there (name, execReq.amount.toString)
getOrUpdateRunningExecutorForRPId(rp.id)
logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
val execResources = rp.executorResources
execResources.foreach { case (r, execReq) =>
r match {
case ResourceProfile.MEMORY =>
heapMem = execReq.amount
case ResourceProfile.OVERHEAD_MEM =>
overheadMem = execReq.amount
case ResourceProfile.PYSPARK_MEM =>
pysparkMem = execReq.amount
case ResourceProfile.OFFHEAP_MEM =>
offHeapMem = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf, execReq)
case ResourceProfile.CORES =>
cores = execReq.amount.toInt
case "gpu" =>
customResources(YARN_GPU_RESOURCE_CONFIG) = execReq.amount.toString
case "fpga" =>
customResources(YARN_FPGA_RESOURCE_CONFIG) = execReq.amount.toString
case rName =>
customResources(rName) = execReq.amount.toString
}
} }
val totalMem = (heapMem + offHeapMem + overheadMem + pysparkMem).toInt // There is a difference in the way custom resources are handled between
val resource = Resource.newInstance(totalMem, cores) // the base default profile and custom ResourceProfiles. To allow for the user
ResourceRequestHelper.setResourceRequests(customResources.toMap, resource) // to request YARN containers with extra resources without Spark scheduling on
logDebug(s"Created resource capability: $resource") // them, the user can specify resources via the <code>spark.yarn.executor.resource.</code>
rpIdToYarnResource.putIfAbsent(rp.id, resource) // config. Those configs are only used in the base default profile though and do
rpIdToResourceProfile(rp.id) = rp // not get propogated into any other custom ResourceProfiles. This is because
// there would be no way to remove them if you wanted a stage to not have them.
// This results in your default profile getting custom resources defined in
// <code>spark.yarn.executor.resource.</code> plus spark defined resources of
// GPU or FPGA. Spark converts GPU and FPGA resources into the YARN built in
// types <code>yarn.io/gpu</code>) and <code>yarn.io/fpga</code>, but does not
// know the mapping of any other resources. Any other Spark custom resources
// are not propogated to YARN for the default profile. So if you want Spark
// to schedule based off a custom resource and have it requested from YARN, you
// must specify it in both YARN (<code>spark.yarn.{driver/executor}.resource.</code>)
// and Spark (<code>spark.{driver/executor}.resource.</code>) configs. Leave the Spark
// config off if you only want YARN containers with the extra resources but Spark not to
// schedule using them. Now for custom ResourceProfiles, it doesn't currently have a way
// to only specify YARN resources without Spark scheduling off of them. This means for
// custom ResourceProfiles we propogate all the resources defined in the ResourceProfile
// to YARN. We still convert GPU and FPGA to the YARN build in types as well. This requires
// that the name of any custom resources you specify match what they are defined as in YARN.
val customResources = if (rp.id == DEFAULT_RESOURCE_PROFILE_ID) {
getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
customSparkResources.filterKeys { r =>
(r == YARN_GPU_RESOURCE_CONFIG || r == YARN_FPGA_RESOURCE_CONFIG)
}
} else {
customSparkResources
} }
val resource =
Resource.newInstance(resourcesWithDefaults.totalMemMiB, resourcesWithDefaults.cores)
ResourceRequestHelper.setResourceRequests(customResources, resource)
logDebug(s"Created resource capability: $resource")
rpIdToYarnResource.putIfAbsent(rp.id, resource)
rpIdToResourceProfile(rp.id) = rp
} }
} }
@ -370,9 +348,8 @@ private[yarn] class YarnAllocator(
this.numLocalityAwareTasksPerResourceProfileId = numLocalityAwareTasksPerResourceProfileId this.numLocalityAwareTasksPerResourceProfileId = numLocalityAwareTasksPerResourceProfileId
this.hostToLocalTaskCountPerResourceProfileId = hostToLocalTaskCountPerResourceProfileId this.hostToLocalTaskCountPerResourceProfileId = hostToLocalTaskCountPerResourceProfileId
createYarnResourceForResourceProfile(resourceProfileToTotalExecs)
val res = resourceProfileToTotalExecs.map { case (rp, numExecs) => val res = resourceProfileToTotalExecs.map { case (rp, numExecs) =>
createYarnResourceForResourceProfile(rp)
if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) { if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) {
logInfo(s"Driver requested a total number of $numExecs executor(s) " + logInfo(s"Driver requested a total number of $numExecs executor(s) " +
s"for resource profile id: ${rp.id}.") s"for resource profile id: ${rp.id}.")
@ -477,7 +454,7 @@ private[yarn] class YarnAllocator(
var requestContainerMessage = s"Will request $missing executor container(s) for " + var requestContainerMessage = s"Will request $missing executor container(s) for " +
s" ResourceProfile Id: $rpId, each with " + s" ResourceProfile Id: $rpId, each with " +
s"${resource.getVirtualCores} core(s) and " + s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)" s"${resource.getMemory} MB memory."
if (ResourceRequestHelper.isYarnResourceTypesAvailable() && if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
ResourceRequestHelper.isYarnCustomResourcesNonEmpty(resource)) { ResourceRequestHelper.isYarnCustomResourcesNonEmpty(resource)) {
requestContainerMessage ++= s" with custom resources: " + resource.toString requestContainerMessage ++= s" with custom resources: " + resource.toString
@ -723,9 +700,10 @@ private[yarn] class YarnAllocator(
} }
val rp = rpIdToResourceProfile(rpId) val rp = rpIdToResourceProfile(rpId)
val defaultResources = ResourceProfile.getDefaultProfileExecutorResources(sparkConf)
val containerMem = rp.executorResources.get(ResourceProfile.MEMORY). val containerMem = rp.executorResources.get(ResourceProfile.MEMORY).
map(_.amount.toInt).getOrElse(executorMemory) map(_.amount).getOrElse(defaultResources.executorMemoryMiB).toInt
val containerCores = rp.getExecutorCores.getOrElse(defaultExecutorCores) val containerCores = rp.getExecutorCores.getOrElse(defaultResources.cores)
val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size
if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) { if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet() getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()

View file

@ -38,7 +38,6 @@ object YarnSparkHadoopUtil {
// the common cases. Memory overhead tends to grow with container size. // the common cases. Memory overhead tends to grow with container size.
val MEMORY_OVERHEAD_FACTOR = 0.10 val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN = 384L
val ANY_HOST = "*" val ANY_HOST = "*"
@ -184,33 +183,12 @@ object YarnSparkHadoopUtil {
ConverterUtils.toContainerId(containerIdString) ConverterUtils.toContainerId(containerIdString)
} }
/**
* Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false.
*/
def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
checkOffHeapEnabled(sparkConf, sizeInMB).toInt
}
/** /**
* Get offHeap memory size from [[ExecutorResourceRequest]] * Get offHeap memory size from [[ExecutorResourceRequest]]
* return 0 if MEMORY_OFFHEAP_ENABLED is false. * return 0 if MEMORY_OFFHEAP_ENABLED is false.
*/ */
def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf, def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf,
execRequest: ExecutorResourceRequest): Long = { execRequest: ExecutorResourceRequest): Long = {
checkOffHeapEnabled(sparkConf, execRequest.amount) Utils.checkOffHeapEnabled(sparkConf, execRequest.amount)
}
/**
* return 0 if MEMORY_OFFHEAP_ENABLED is false.
*/
def checkOffHeapEnabled(sparkConf: SparkConf, offHeapSize: Long): Long = {
if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
require(offHeapSize > 0,
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
offHeapSize
} else {
0
}
} }
} }

View file

@ -75,7 +75,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
// priority has to be 0 to match default profile id // priority has to be 0 to match default profile id
val RM_REQUEST_PRIORITY = Priority.newInstance(0) val RM_REQUEST_PRIORITY = Priority.newInstance(0)
val defaultRPId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID val defaultRPId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
val defaultRP = ResourceProfile.getOrCreateDefaultProfile(sparkConf) var defaultRP = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
override def beforeEach(): Unit = { override def beforeEach(): Unit = {
super.beforeEach() super.beforeEach()
@ -114,6 +114,9 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
for ((name, value) <- additionalConfigs) { for ((name, value) <- additionalConfigs) {
sparkConfClone.set(name, value) sparkConfClone.set(name, value)
} }
// different spark confs means we need to reinit the default profile
ResourceProfile.clearDefaultProfile()
defaultRP = ResourceProfile.getOrCreateDefaultProfile(sparkConfClone)
val allocator = new YarnAllocator( val allocator = new YarnAllocator(
"not used", "not used",
@ -268,12 +271,13 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G")) Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G"))
handler.updateResourceRequests() handler.updateResourceRequests()
val container = createContainer("host1", resource = handler.defaultResource) val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
val container = createContainer("host1", resource = defaultResource)
handler.handleAllocatedContainers(Array(container)) handler.handleAllocatedContainers(Array(container))
// get amount of memory and vcores from resource, so effectively skipping their validation // get amount of memory and vcores from resource, so effectively skipping their validation
val expectedResources = Resource.newInstance(handler.defaultResource.getMemory(), val expectedResources = Resource.newInstance(defaultResource.getMemory(),
handler.defaultResource.getVirtualCores) defaultResource.getVirtualCores)
setResourceRequests(Map("gpu" -> "2G"), expectedResources) setResourceRequests(Map("gpu" -> "2G"), expectedResources)
val captor = ArgumentCaptor.forClass(classOf[ContainerRequest]) val captor = ArgumentCaptor.forClass(classOf[ContainerRequest])
@ -296,7 +300,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val (handler, _) = createAllocator(1, mockAmClient, sparkResources) val (handler, _) = createAllocator(1, mockAmClient, sparkResources)
handler.updateResourceRequests() handler.updateResourceRequests()
val yarnRInfo = ResourceRequestTestHelper.getResources(handler.defaultResource) val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
val yarnRInfo = ResourceRequestTestHelper.getResources(defaultResource)
val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) ).toMap val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) ).toMap
assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty) assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty)
assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3) assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3)
@ -656,9 +661,10 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte) sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte)
val (handler, _) = createAllocator(maxExecutors = 1, val (handler, _) = createAllocator(maxExecutors = 1,
additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString)) additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString))
val memory = handler.defaultResource.getMemory val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
val memory = defaultResource.getMemory
assert(memory == assert(memory ==
executorMemory + offHeapMemoryInMB + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN) executorMemory + offHeapMemoryInMB + ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)
} finally { } finally {
sparkConf.set(MEMORY_OFFHEAP_ENABLED, originalOffHeapEnabled) sparkConf.set(MEMORY_OFFHEAP_ENABLED, originalOffHeapEnabled)
sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize) sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize)

View file

@ -142,31 +142,4 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
} }
} }
test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is false") {
val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(new SparkConf())
assert(executorOffHeapMemory == 0)
}
test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is true") {
val offHeapMemoryInMB = 50
val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024
val sparkConf = new SparkConf()
.set(MEMORY_OFFHEAP_ENABLED, true)
.set(MEMORY_OFFHEAP_SIZE, offHeapMemory)
val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
assert(executorOffHeapMemory == offHeapMemoryInMB)
}
test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " +
"but MEMORY_OFFHEAP_SIZE not config scene") {
val sparkConf = new SparkConf()
.set(MEMORY_OFFHEAP_ENABLED, true)
val expected =
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true"
val message = intercept[IllegalArgumentException] {
YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
}.getMessage
assert(message.contains(expected))
}
} }