[SPARK-29149][YARN] Update YARN cluster manager For Stage Level Scheduling

### What changes were proposed in this pull request?

Yarn side changes for Stage level scheduling.  The previous PR for dynamic allocation changes was https://github.com/apache/spark/pull/27313

Modified the data structures to store things on a per ResourceProfile basis.
 I tried to keep the code changes to a minimum, the main loop that requests just goes through each Resourceprofile and the logic inside for each one stayed very close to the same.
On submission we now have to give each ResourceProfile a separate yarn Priority because yarn doesn't support asking for containers with different resources at the same Priority. We just use the profile id as the priority level.
Using a different Priority actually makes things easier when the containers come back to match them again which ResourceProfile they were requested for.
The expectation is that yarn will only give you a container with resource amounts you requested or more. It should never give you a container if it doesn't satisfy your resource requests.

If you want to see the full feature changes you can look at https://github.com/apache/spark/pull/27053/files for reference

### Why are the changes needed?

For stage level scheduling YARN support.

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

Tested manually on YARN cluster and then unit tests.

Closes #27583 from tgravescs/SPARK-29149.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
Thomas Graves 2020-02-28 15:23:33 -06:00 committed by Thomas Graves
parent 6c0c41fa0d
commit 0e2ca11d80
12 changed files with 663 additions and 316 deletions

View file

@ -117,9 +117,9 @@ private[spark] object CoarseGrainedClusterMessages {
// Request executors by specifying the new total number of executors desired
// This includes executors already pending or running
case class RequestExecutors(
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
resourceProfileToTotalExecs: Map[ResourceProfile, Int],
numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
hostToLocalTaskCount: Map[Int, Map[String, Int]],
nodeBlacklist: Set[String])
extends CoarseGrainedClusterMessage

View file

@ -30,7 +30,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@ -61,6 +61,7 @@ class HeartbeatReceiverSuite
PrivateMethod[collection.Map[String, Long]](Symbol("executorLastSeen"))
private val _executorTimeoutMs = PrivateMethod[Long](Symbol("executorTimeoutMs"))
private val _killExecutorThread = PrivateMethod[ExecutorService](Symbol("killExecutorThread"))
var conf: SparkConf = _
/**
* Before each test, set up the SparkContext and a custom [[HeartbeatReceiver]]
@ -68,7 +69,7 @@ class HeartbeatReceiverSuite
*/
override def beforeEach(): Unit = {
super.beforeEach()
val conf = new SparkConf()
conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
.set(DYN_ALLOCATION_TESTING, true)
@ -76,7 +77,6 @@ class HeartbeatReceiverSuite
scheduler = mock(classOf[TaskSchedulerImpl])
when(sc.taskScheduler).thenReturn(scheduler)
when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]())
when(scheduler.resourcesReqsPerTask).thenReturn(Seq.empty)
when(scheduler.sc).thenReturn(sc)
heartbeatReceiverClock = new ManualClock
heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
@ -164,9 +164,10 @@ class HeartbeatReceiverSuite
test("expire dead hosts should kill executors with replacement (SPARK-8119)") {
// Set up a fake backend and cluster manager to simulate killing executors
val rpcEnv = sc.env.rpcEnv
val fakeClusterManager = new FakeClusterManager(rpcEnv)
val fakeClusterManager = new FakeClusterManager(rpcEnv, conf)
val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager)
val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef)
val fakeSchedulerBackend =
new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef, sc.resourceProfileManager)
when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)
// Register fake executors with our fake scheduler backend
@ -282,18 +283,16 @@ private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpo
private class FakeSchedulerBackend(
scheduler: TaskSchedulerImpl,
rpcEnv: RpcEnv,
clusterManagerEndpoint: RpcEndpointRef)
clusterManagerEndpoint: RpcEndpointRef,
resourceProfileManager: ResourceProfileManager)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
protected override def doRequestTotalExecutors(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](
RequestExecutors(
resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf)),
numLocalityAwareTasksPerResourceProfileId(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
rpHostToLocalTaskCount(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID),
Set.empty))
}
RequestExecutors(resourceProfileToTotalExecs, numLocalityAwareTasksPerResourceProfileId,
rpHostToLocalTaskCount, Set.empty))
}
protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds))
@ -303,7 +302,7 @@ private class FakeSchedulerBackend(
/**
* Dummy cluster manager to simulate responses to executor allocation requests.
*/
private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoint {
private class FakeClusterManager(override val rpcEnv: RpcEnv, conf: SparkConf) extends RpcEndpoint {
private var targetNumExecutors = 0
private val executorIdsToKill = new mutable.HashSet[String]
@ -311,8 +310,9 @@ private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoin
def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestExecutors(requestedTotal, _, _, _) =>
targetNumExecutors = requestedTotal
case RequestExecutors(resourceProfileToTotalExecs, _, _, _) =>
targetNumExecutors =
resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf))
context.reply(true)
case KillExecutors(executorIds) =>
executorIdsToKill ++= executorIds

View file

@ -593,7 +593,7 @@ private[spark] class ApplicationMaster(
}
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
val numPendingAllocate = allocator.getNumContainersPendingAllocate
var sleepStartNs = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
@ -778,8 +778,11 @@ private[spark] class ApplicationMaster(
case r: RequestExecutors =>
Option(allocator) match {
case Some(a) =>
if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
if (a.requestTotalExecutorsWithPreferredLocalities(
r.resourceProfileToTotalExecs,
r.numLocalityAwareTasksPerResourceProfileId,
r.hostToLocalTaskCount,
r.nodeBlacklist)) {
resetAllocatorInterval()
}
context.reply(true)

View file

@ -40,11 +40,11 @@ private[spark] class ApplicationMasterSource(prefix: String, yarnAllocator: Yarn
})
metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.numLocalityAwareTasks
override def getValue: Int = yarnAllocator.getNumLocalityAwareTasks
})
metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), new Gauge[Int] {
override def getValue: Int = yarnAllocator.numContainersPendingAllocate
override def getValue: Int = yarnAllocator.getNumContainersPendingAllocate
})
}

View file

@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.spark.SparkConf
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile
private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])
@ -82,7 +82,6 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack
private[yarn] class LocalityPreferredContainerPlacementStrategy(
val sparkConf: SparkConf,
val yarnConf: Configuration,
val resource: Resource,
resolver: SparkRackResolver) {
/**
@ -96,6 +95,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
* containers
* @param localityMatchedPendingAllocations A sequence of pending container request which
* matches the localities of current required tasks.
* @param rp The ResourceProfile associated with this container.
* @return node localities and rack localities, each locality is an array of string,
* the length of localities is the same as number of containers
*/
@ -104,11 +104,12 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
numLocalityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
localityMatchedPendingAllocations: Seq[ContainerRequest]
localityMatchedPendingAllocations: Seq[ContainerRequest],
rp: ResourceProfile
): Array[ContainerLocalityPreferences] = {
val updatedHostToContainerCount = expectedHostToContainerCount(
numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,
localityMatchedPendingAllocations)
localityMatchedPendingAllocations, rp)
val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum
// The number of containers to allocate, divided into two groups, one with preferred locality,
@ -152,11 +153,14 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
}
/**
* Calculate the number of executors need to satisfy the given number of pending tasks.
* Calculate the number of executors needed to satisfy the given number of pending tasks for
* the ResourceProfile.
*/
private def numExecutorsPending(numTasksPending: Int): Int = {
val coresPerExecutor = resource.getVirtualCores
(numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor
private def numExecutorsPending(
numTasksPending: Int,
rp: ResourceProfile): Int = {
val tasksPerExec = rp.maxTasksPerExecutor(sparkConf)
math.ceil(numTasksPending / tasksPerExec.toDouble).toInt
}
/**
@ -175,14 +179,15 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
localityMatchedPendingAllocations: Seq[ContainerRequest]
localityMatchedPendingAllocations: Seq[ContainerRequest],
rp: ResourceProfile
): Map[String, Int] = {
val totalLocalTaskNum = hostToLocalTaskCount.values.sum
val pendingHostToContainersMap = pendingHostToContainerCount(localityMatchedPendingAllocations)
hostToLocalTaskCount.map { case (host, count) =>
val expectedCount =
count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum
count.toDouble * numExecutorsPending(localityAwareTasks, rp) / totalLocalTaskNum
// Take the locality of pending containers into consideration
val existedCount = allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) +
pendingHostToContainersMap.getOrElse(host, 0.0)

View file

@ -227,6 +227,17 @@ private object ResourceRequestHelper extends Logging {
resourceInformation
}
def isYarnCustomResourcesNonEmpty(resource: Resource): Boolean = {
try {
// Use reflection as this uses APIs only available in Hadoop 3
val getResourcesMethod = resource.getClass().getMethod("getResources")
val resources = getResourcesMethod.invoke(resource).asInstanceOf[Array[Any]]
if (resources.nonEmpty) true else false
} catch {
case _: NoSuchMethodException => false
}
}
/**
* Checks whether Hadoop 2.x or 3 is used as a dependency.
* In case of Hadoop 3 and later, the ResourceInformation class

View file

@ -17,9 +17,9 @@
package org.apache.spark.deploy.yarn
import java.util.Collections
import java.util.concurrent._
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable
@ -39,6 +39,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@ -75,19 +76,69 @@ private[yarn] class YarnAllocator(
import YarnAllocator._
// Visible for testing.
val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]
@GuardedBy("this")
val allocatedHostToContainersMapPerRPId =
new HashMap[Int, HashMap[String, collection.mutable.Set[ContainerId]]]
@GuardedBy("this")
val allocatedContainerToHostMap = new HashMap[ContainerId, String]
// Containers that we no longer care about. We've either already told the RM to release them or
// will on the next heartbeat. Containers get removed from this map after the RM tells us they've
// completed.
private val releasedContainers = Collections.newSetFromMap[ContainerId](
new ConcurrentHashMap[ContainerId, java.lang.Boolean])
@GuardedBy("this")
private val releasedContainers = collection.mutable.HashSet[ContainerId]()
private val runningExecutors = Collections.newSetFromMap[String](
new ConcurrentHashMap[String, java.lang.Boolean]())
@GuardedBy("this")
private val runningExecutorsPerResourceProfileId = new HashMap[Int, mutable.Set[String]]()
private val numExecutorsStarting = new AtomicInteger(0)
@GuardedBy("this")
private val numExecutorsStartingPerResourceProfileId = new HashMap[Int, AtomicInteger]
@GuardedBy("this")
private val targetNumExecutorsPerResourceProfileId = new mutable.HashMap[Int, Int]
// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
// list of requesters that should be responded to once we find out why the given executor
// was lost.
@GuardedBy("this")
private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
// Maintain loss reasons for already released executors, it will be added when executor loss
// reason is got from AM-RM call, and be removed after querying this loss reason.
@GuardedBy("this")
private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason]
// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
@GuardedBy("this")
private[yarn] val executorIdToContainer = new HashMap[String, Container]
@GuardedBy("this")
private var numUnexpectedContainerRelease = 0L
@GuardedBy("this")
private val containerIdToExecutorIdAndResourceProfileId = new HashMap[ContainerId, (String, Int)]
// Use a ConcurrentHashMap because this is used in matchContainerToRequest, which is called
// from the rack resolver thread where synchronize(this) on this would cause a deadlock.
@GuardedBy("ConcurrentHashMap")
private[yarn] val rpIdToYarnResource = new ConcurrentHashMap[Int, Resource]()
// note currently we don't remove ResourceProfiles
@GuardedBy("this")
private[yarn] val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]
// A map of ResourceProfile id to a map of preferred hostname and possible
// task numbers running on it.
@GuardedBy("this")
private var hostToLocalTaskCountPerResourceProfileId: Map[Int, Map[String, Int]] =
Map(DEFAULT_RESOURCE_PROFILE_ID -> Map.empty)
// ResourceProfile Id to number of tasks that have locality preferences in active stages
@GuardedBy("this")
private[yarn] var numLocalityAwareTasksPerResourceProfileId: Map[Int, Int] =
Map(DEFAULT_RESOURCE_PROFILE_ID -> 0)
/**
* Used to generate a unique ID per executor
@ -102,6 +153,7 @@ private[yarn] class YarnAllocator(
*
* @see SPARK-12864
*/
@GuardedBy("this")
private var executorIdCounter: Int =
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
@ -110,26 +162,6 @@ private[yarn] class YarnAllocator(
private val allocatorBlacklistTracker =
new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker)
@volatile private var targetNumExecutors =
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
// list of requesters that should be responded to once we find out why the given executor
// was lost.
private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
// Maintain loss reasons for already released executors, it will be added when executor loss
// reason is got from AM-RM call, and be removed after querying this loss reason.
private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason]
// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]
private var numUnexpectedContainerRelease = 0L
private val containerIdToExecutorId = new HashMap[ContainerId, String]
// Executor memory in MiB.
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Executor offHeap memory in MiB.
@ -142,17 +174,18 @@ private[yarn] class YarnAllocator(
} else {
0
}
// Number of cores per executor.
protected val executorCores = sparkConf.get(EXECUTOR_CORES)
// Number of cores per executor for the default profile
protected val defaultExecutorCores = sparkConf.get(EXECUTOR_CORES)
private val executorResourceRequests =
getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)
// Resource capability requested for each executor
private[yarn] val resource: Resource = {
val resource = Resource.newInstance(
executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
// Resource capability requested for each executor for the default profile
private[yarn] val defaultResource: Resource = {
val resource: Resource = Resource.newInstance(
executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory,
defaultExecutorCores)
ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
logDebug(s"Created resource capability: $resource")
resource
@ -166,19 +199,42 @@ private[yarn] class YarnAllocator(
private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
// A map to store preferred hostname and possible task numbers running on it.
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
// Number of tasks that have locality preferences in active stages
private[yarn] var numLocalityAwareTasks: Int = 0
// A container placement strategy based on pending tasks' locality preference
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resolver)
def getNumExecutorsRunning: Int = runningExecutors.size()
// The default profile is always present so we need to initialize the datastructures keyed by
// ResourceProfile id to ensure its present if things start running before a request for
// executors could add it. This approach is easier then going and special casing everywhere.
private def initDefaultProfile(): Unit = synchronized {
allocatedHostToContainersMapPerRPId(DEFAULT_RESOURCE_PROFILE_ID) =
new HashMap[String, mutable.Set[ContainerId]]()
runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID, mutable.HashSet[String]())
numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = new AtomicInteger(0)
targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) =
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
rpIdToYarnResource.put(DEFAULT_RESOURCE_PROFILE_ID, defaultResource)
rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) =
ResourceProfile.getOrCreateDefaultProfile(sparkConf)
}
def getNumReleasedContainers: Int = releasedContainers.size()
initDefaultProfile()
def getNumExecutorsRunning: Int = synchronized {
runningExecutorsPerResourceProfileId.values.map(_.size).sum
}
def getNumLocalityAwareTasks: Int = synchronized {
numLocalityAwareTasksPerResourceProfileId.values.sum
}
def getNumExecutorsStarting: Int = synchronized {
numExecutorsStartingPerResourceProfileId.values.map(_.get()).sum
}
def getNumReleasedContainers: Int = synchronized {
releasedContainers.size
}
def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors
@ -186,49 +242,147 @@ private[yarn] class YarnAllocator(
/**
* A sequence of pending container requests that have not yet been fulfilled.
* ResourceProfile id -> pendingAllocate container request
*/
def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST)
def getPendingAllocate: Map[Int, Seq[ContainerRequest]] = getPendingAtLocation(ANY_HOST)
def numContainersPendingAllocate: Int = synchronized {
getPendingAllocate.size
def getNumContainersPendingAllocate: Int = synchronized {
getPendingAllocate.values.flatten.size
}
// YARN priorities are such that lower number is higher priority.
// We need to allocate a different priority for each ResourceProfile because YARN
// won't allow different container resource requirements within a Priority.
// We could allocate per Stage to make sure earlier stages get priority but Spark
// always finishes a stage before starting a later one and if we have 2 running in parallel
// the priority doesn't matter.
// We are using the ResourceProfile id as the priority.
private def getContainerPriority(rpId: Int): Priority = {
Priority.newInstance(rpId)
}
// The ResourceProfile id is the priority
private def getResourceProfileIdFromPriority(priority: Priority): Int = {
priority.getPriority()
}
private def getOrUpdateAllocatedHostToContainersMapForRPId(
rpId: Int): HashMap[String, collection.mutable.Set[ContainerId]] = synchronized {
allocatedHostToContainersMapPerRPId.getOrElseUpdate(rpId,
new HashMap[String, mutable.Set[ContainerId]]())
}
private def getOrUpdateRunningExecutorForRPId(rpId: Int): mutable.Set[String] = synchronized {
runningExecutorsPerResourceProfileId.getOrElseUpdate(rpId, mutable.HashSet[String]())
}
private def getOrUpdateNumExecutorsStartingForRPId(rpId: Int): AtomicInteger = synchronized {
numExecutorsStartingPerResourceProfileId.getOrElseUpdate(rpId, new AtomicInteger(0))
}
private def getOrUpdateTargetNumExecutorsForRPId(rpId: Int): Int = synchronized {
targetNumExecutorsPerResourceProfileId.getOrElseUpdate(rpId,
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf))
}
/**
* A sequence of pending container requests at the given location that have not yet been
* fulfilled.
* A sequence of pending container requests at the given location for each ResourceProfile id
* that have not yet been fulfilled.
*/
private def getPendingAtLocation(location: String): Seq[ContainerRequest] =
amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala
.flatMap(_.asScala)
private def getPendingAtLocation(
location: String): Map[Int, Seq[ContainerRequest]] = synchronized {
val allContainerRequests = new mutable.HashMap[Int, Seq[ContainerRequest]]
rpIdToResourceProfile.keys.map { id =>
val profResource = rpIdToYarnResource.get(id)
val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource)
.asScala.flatMap(_.asScala)
allContainerRequests(id) = result
}
allContainerRequests.toMap
}
// if a ResourceProfile hasn't been seen yet, create the corresponding YARN Resource for it
private def createYarnResourceForResourceProfile(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = synchronized {
resourceProfileToTotalExecs.foreach { case (rp, num) =>
if (!rpIdToYarnResource.contains(rp.id)) {
// Start with the application or default settings
var heapMem = executorMemory.toLong
// Note we currently don't support off heap memory in ResourceProfile - SPARK-30794
var offHeapMem = executorOffHeapMemory.toLong
var overheadMem = memoryOverhead.toLong
var pysparkMem = pysparkWorkerMemory.toLong
var cores = defaultExecutorCores
val customResources = new mutable.HashMap[String, String]
// track the resource profile if not already there
getOrUpdateRunningExecutorForRPId(rp.id)
logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
val execResources = rp.executorResources
execResources.foreach { case (r, execReq) =>
r match {
case ResourceProfile.MEMORY =>
heapMem = execReq.amount
case ResourceProfile.OVERHEAD_MEM =>
overheadMem = execReq.amount
case ResourceProfile.PYSPARK_MEM =>
pysparkMem = execReq.amount
case ResourceProfile.CORES =>
cores = execReq.amount.toInt
case "gpu" =>
customResources(YARN_GPU_RESOURCE_CONFIG) = execReq.amount.toString
case "fpga" =>
customResources(YARN_FPGA_RESOURCE_CONFIG) = execReq.amount.toString
case rName =>
customResources(rName) = execReq.amount.toString
}
}
val totalMem = (heapMem + offHeapMem + overheadMem + pysparkMem).toInt
val resource = Resource.newInstance(totalMem, cores)
ResourceRequestHelper.setResourceRequests(customResources.toMap, resource)
logDebug(s"Created resource capability: $resource")
rpIdToYarnResource.putIfAbsent(rp.id, resource)
rpIdToResourceProfile(rp.id) = rp
}
}
}
/**
* Request as many executors from the ResourceManager as needed to reach the desired total. If
* the requested total is smaller than the current number of running executors, no executors will
* be killed.
* @param requestedTotal total number of containers requested
* @param localityAwareTasks number of locality aware tasks to be used as container placement hint
* @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
* container placement hint.
* @param resourceProfileToTotalExecs total number of containers requested for each
* ResourceProfile
* @param numLocalityAwareTasksPerResourceProfileId number of locality aware tasks for each
* ResourceProfile id to be used as container
* placement hint.
* @param hostToLocalTaskCount a map of preferred hostname to possible task counts for each
* ResourceProfile id to be used as container placement hint.
* @param nodeBlacklist blacklisted nodes, which is passed in to avoid allocating new containers
* on them. It will be used to update the application master's blacklist.
* @return Whether the new requested total is different than the old value.
*/
def requestTotalExecutorsWithPreferredLocalities(
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
resourceProfileToTotalExecs: Map[ResourceProfile, Int],
numLocalityAwareTasksPerResourceProfileId: Map[Int, Int],
hostToLocalTaskCountPerResourceProfileId: Map[Int, Map[String, Int]],
nodeBlacklist: Set[String]): Boolean = synchronized {
this.numLocalityAwareTasks = localityAwareTasks
this.hostToLocalTaskCounts = hostToLocalTaskCount
this.numLocalityAwareTasksPerResourceProfileId = numLocalityAwareTasksPerResourceProfileId
this.hostToLocalTaskCountPerResourceProfileId = hostToLocalTaskCountPerResourceProfileId
if (requestedTotal != targetNumExecutors) {
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
targetNumExecutors = requestedTotal
allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
true
} else {
false
createYarnResourceForResourceProfile(resourceProfileToTotalExecs)
val res = resourceProfileToTotalExecs.map { case (rp, numExecs) =>
if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) {
logInfo(s"Driver requested a total number of $numExecs executor(s) " +
s"for resource profile id: ${rp.id}.")
targetNumExecutorsPerResourceProfileId(rp.id) = numExecs
allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
true
} else {
false
}
}
res.exists(_ == true)
}
/**
@ -237,8 +391,9 @@ private[yarn] class YarnAllocator(
def killExecutor(executorId: String): Unit = synchronized {
executorIdToContainer.get(executorId) match {
case Some(container) if !releasedContainers.contains(container.getId) =>
val (_, rpId) = containerIdToExecutorIdAndResourceProfileId(container.getId)
internalReleaseContainer(container)
runningExecutors.remove(executorId)
getOrUpdateRunningExecutorForRPId(rpId).remove(executorId)
case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
}
}
@ -267,8 +422,8 @@ private[yarn] class YarnAllocator(
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
runningExecutors.size,
numExecutorsStarting.get,
getNumExecutorsRunning,
getNumExecutorsStarting,
allocateResponse.getAvailableResources))
handleAllocatedContainers(allocatedContainers.asScala)
@ -279,108 +434,122 @@ private[yarn] class YarnAllocator(
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, runningExecutors.size))
.format(completedContainers.size, getNumExecutorsRunning))
}
}
/**
* Update the set of container requests that we will sync with the RM based on the number of
* executors we have currently running and our target number of executors.
* executors we have currently running and our target number of executors for each
* ResourceProfile.
*
* Visible for testing.
*/
def updateResourceRequests(): Unit = {
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
val missing = targetNumExecutors - numPendingAllocate -
numExecutorsStarting.get - runningExecutors.size
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
s"executorsStarting: ${numExecutorsStarting.get}")
def updateResourceRequests(): Unit = synchronized {
val pendingAllocatePerResourceProfileId = getPendingAllocate
val missingPerProfile = targetNumExecutorsPerResourceProfileId.map { case (rpId, targetNum) =>
val starting = getOrUpdateNumExecutorsStartingForRPId(rpId).get
val pending = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty).size
val running = getOrUpdateRunningExecutorForRPId(rpId).size
logDebug(s"Updating resource requests for ResourceProfile id: $rpId, target: " +
s"$targetNum, pending: $pending, running: $running, executorsStarting: $starting")
(rpId, targetNum - pending - running - starting)
}.toMap
// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
// consideration of container placement, treat as allocated containers.
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)
missingPerProfile.foreach { case (rpId, missing) =>
val hostToLocalTaskCount =
hostToLocalTaskCountPerResourceProfileId.getOrElse(rpId, Map.empty)
val pendingAllocate = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty)
val numPendingAllocate = pendingAllocate.size
// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
// consideration of container placement, treat as allocated containers.
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCount, pendingAllocate)
if (missing > 0) {
if (log.isInfoEnabled()) {
var requestContainerMessage = s"Will request $missing executor container(s), each with " +
if (missing > 0) {
val resource = rpIdToYarnResource.get(rpId)
if (log.isInfoEnabled()) {
var requestContainerMessage = s"Will request $missing executor container(s) for " +
s" ResourceProfile Id: $rpId, each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)"
if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
executorResourceRequests.nonEmpty) {
requestContainerMessage ++= s" with custom resources: " + resource.toString
if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
ResourceRequestHelper.isYarnCustomResourcesNonEmpty(resource)) {
requestContainerMessage ++= s" with custom resources: " + resource.toString
}
logInfo(requestContainerMessage)
}
logInfo(requestContainerMessage)
}
// cancel "stale" requests for locations that are no longer needed
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
}
val cancelledContainers = staleRequests.size
if (cancelledContainers > 0) {
logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
}
// consider the number of new containers and cancelled stale containers available
val availableContainers = missing + cancelledContainers
// to maximize locality, include requests with no locality preference that can be cancelled
val potentialContainers = availableContainers + anyHostRequests.size
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
allocatedHostToContainersMap, localRequests)
val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
containerLocalityPreferences.foreach {
case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
newLocalityRequests += createContainerRequest(resource, nodes, racks)
case _ =>
}
if (availableContainers >= newLocalityRequests.size) {
// more containers are available than needed for locality, fill in requests for any host
for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
newLocalityRequests += createContainerRequest(resource, null, null)
// cancel "stale" requests for locations that are no longer needed
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
}
} else {
val numToCancel = newLocalityRequests.size - availableContainers
// cancel some requests without locality preferences to schedule more local containers
anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
amClient.removeContainerRequest(nonLocal)
val cancelledContainers = staleRequests.size
if (cancelledContainers > 0) {
logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
}
if (numToCancel > 0) {
logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
}
}
newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
}
// consider the number of new containers and cancelled stale containers available
val availableContainers = missing + cancelledContainers
if (log.isInfoEnabled()) {
val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
if (anyHost.nonEmpty) {
logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
// to maximize locality, include requests with no locality preference that can be cancelled
val potentialContainers = availableContainers + anyHostRequests.size
val allocatedHostToContainer = getOrUpdateAllocatedHostToContainersMapForRPId(rpId)
val numLocalityAwareTasks = numLocalityAwareTasksPerResourceProfileId.getOrElse(rpId, 0)
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
potentialContainers, numLocalityAwareTasks, hostToLocalTaskCount,
allocatedHostToContainer, localRequests, rpIdToResourceProfile(rpId))
val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
containerLocalityPreferences.foreach {
case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
newLocalityRequests += createContainerRequest(resource, nodes, racks, rpId)
case _ =>
}
localized.foreach { request =>
logInfo(s"Submitted container request for host ${hostStr(request)}.")
if (availableContainers >= newLocalityRequests.size) {
// more containers are available than needed for locality, fill in requests for any host
for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
newLocalityRequests += createContainerRequest(resource, null, null, rpId)
}
} else {
val numToCancel = newLocalityRequests.size - availableContainers
// cancel some requests without locality preferences to schedule more local containers
anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
amClient.removeContainerRequest(nonLocal)
}
if (numToCancel > 0) {
logInfo(s"Canceled $numToCancel unlocalized container requests to " +
s"resubmit with locality")
}
}
newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
}
if (log.isInfoEnabled()) {
val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
if (anyHost.nonEmpty) {
logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
}
localized.foreach { request =>
logInfo(s"Submitted container request for host ${hostStr(request)}.")
}
}
} else if (numPendingAllocate > 0 && missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new " +
s"desired total ${getOrUpdateTargetNumExecutorsForRPId(rpId)} executors.")
// cancel pending allocate requests by taking locality preference into account
val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
cancelRequests.foreach(amClient.removeContainerRequest)
}
} else if (numPendingAllocate > 0 && missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
s"total $targetNumExecutors executors.")
// cancel pending allocate requests by taking locality preference into account
val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
cancelRequests.foreach(amClient.removeContainerRequest)
}
}
@ -405,8 +574,10 @@ private[yarn] class YarnAllocator(
private def createContainerRequest(
resource: Resource,
nodes: Array[String],
racks: Array[String]): ContainerRequest = {
new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull)
racks: Array[String],
rpId: Int): ContainerRequest = {
new ContainerRequest(resource, nodes, racks, getContainerPriority(rpId),
true, labelExpression.orNull)
}
/**
@ -499,20 +670,17 @@ private[yarn] class YarnAllocator(
location: String,
containersToUse: ArrayBuffer[Container],
remaining: ArrayBuffer[Container]): Unit = {
// SPARK-6050: certain Yarn configurations return a virtual core count that doesn't match the
// request; for example, capacity scheduler + DefaultResourceCalculator. So match on requested
// memory, but use the asked vcore count for matching, effectively disabling matching on vcore
// count.
val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory,
resource.getVirtualCores)
ResourceRequestHelper.setResourceRequests(executorResourceRequests, matchingResource)
// Match on the exact resource we requested so there shouldn't be a mismatch,
// we are relying on YARN to return a container with resources no less then we requested.
// If we change this, or starting validating the container, be sure the logic covers SPARK-6050.
val rpId = getResourceProfileIdFromPriority(allocatedContainer.getPriority)
val resourceForRP = rpIdToYarnResource.get(rpId)
logDebug(s"Calling amClient.getMatchingRequests with parameters: " +
s"priority: ${allocatedContainer.getPriority}, " +
s"location: $location, resource: $matchingResource")
s"location: $location, resource: $resourceForRP")
val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
matchingResource)
resourceForRP)
// Match the allocation to a request
if (!matchingRequests.isEmpty) {
@ -528,30 +696,38 @@ private[yarn] class YarnAllocator(
/**
* Launches executors in the allocated containers.
*/
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized {
for (container <- containersToUse) {
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
assert(container.getResource.getMemory >= resource.getMemory)
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
assert(container.getResource.getMemory >= yarnResourceForRpId.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId")
s"for executor with ID $executorId for ResourceProfile Id $rpId")
def updateInternalState(): Unit = synchronized {
runningExecutors.add(executorId)
numExecutorsStarting.decrementAndGet()
getOrUpdateRunningExecutorForRPId(rpId).add(executorId)
getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
containerIdToExecutorIdAndResourceProfileId(container.getId) = (executorId, rpId)
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
val localallocatedHostToContainersMap = getOrUpdateAllocatedHostToContainersMapForRPId(rpId)
val containerSet = localallocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
}
if (runningExecutors.size() < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
val rp = rpIdToResourceProfile(rpId)
val containerMem = rp.executorResources.get(ResourceProfile.MEMORY).
map(_.amount.toInt).getOrElse(executorMemory)
val containerCores = rp.getExecutorCores.getOrElse(defaultExecutorCores)
val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size
if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
if (launchContainers) {
launcherPool.execute(() => {
try {
@ -562,17 +738,17 @@ private[yarn] class YarnAllocator(
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
containerMem,
containerCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported
rp.id
).run()
updateInternalState()
} catch {
case e: Throwable =>
numExecutorsStarting.decrementAndGet()
getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
if (NonFatal(e)) {
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately
@ -589,24 +765,28 @@ private[yarn] class YarnAllocator(
}
} else {
logInfo(("Skip launching executorRunnable as running executors count: %d " +
"reached target executors count: %d.").format(
runningExecutors.size, targetNumExecutors))
"reached target executors count: %d.").format(rpRunningExecs,
getOrUpdateTargetNumExecutorsForRPId(rpId)))
}
}
}
// Visible for testing.
private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
private[yarn] def processCompletedContainers(
completedContainers: Seq[ContainerStatus]): Unit = synchronized {
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
val (_, rpId) = containerIdToExecutorIdAndResourceProfileId.getOrElse(containerId,
("", DEFAULT_RESOURCE_PROFILE_ID))
val alreadyReleased = releasedContainers.remove(containerId)
val hostOpt = allocatedContainerToHostMap.get(containerId)
val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
containerIdToExecutorId.get(containerId) match {
case Some(executorId) => runningExecutors.remove(executorId)
containerIdToExecutorIdAndResourceProfileId.get(containerId) match {
case Some((executorId, _)) =>
getOrUpdateRunningExecutorForRPId(rpId).remove(executorId)
case None => logWarning(s"Cannot find executorId for container: ${containerId.toString}")
}
@ -679,19 +859,19 @@ private[yarn] class YarnAllocator(
for {
host <- hostOpt
containerSet <- allocatedHostToContainersMap.get(host)
containerSet <- getOrUpdateAllocatedHostToContainersMapForRPId(rpId).get(host)
} {
containerSet.remove(containerId)
if (containerSet.isEmpty) {
allocatedHostToContainersMap.remove(host)
getOrUpdateAllocatedHostToContainersMapForRPId(rpId).remove(host)
} else {
allocatedHostToContainersMap.update(host, containerSet)
getOrUpdateAllocatedHostToContainersMapForRPId(rpId).update(host, containerSet)
}
allocatedContainerToHostMap.remove(containerId)
}
containerIdToExecutorId.remove(containerId).foreach { eid =>
containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) =>
executorIdToContainer.remove(eid)
pendingLossReasonRequests.remove(eid) match {
case Some(pendingRequests) =>
@ -737,12 +917,14 @@ private[yarn] class YarnAllocator(
}
}
private def internalReleaseContainer(container: Container): Unit = {
private def internalReleaseContainer(container: Container): Unit = synchronized {
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
}
private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
private[yarn] def getNumUnexpectedContainerRelease: Long = synchronized {
numUnexpectedContainerRelease
}
private[yarn] def getNumPendingLossReasonRequests: Int = synchronized {
pendingLossReasonRequests.size

View file

@ -130,12 +130,8 @@ private[spark] abstract class YarnSchedulerBackend(
val filteredRPHostToLocalTaskCount = rpHostToLocalTaskCount.map { case (rpid, v) =>
(rpid, v.filter { case (host, count) => !nodeBlacklist.contains(host) })
}
// TODO - default everything to default profile until YARN pieces
val defaultProf = ResourceProfile.getOrCreateDefaultProfile(conf)
val hostToLocalTaskCount = filteredRPHostToLocalTaskCount.getOrElse(defaultProf.id, Map.empty)
val localityAwareTasks = numLocalityAwareTasksPerResourceProfileId.getOrElse(defaultProf.id, 0)
val numExecutors = resourceProfileToTotalExecs.getOrElse(defaultProf, 0)
RequestExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount, nodeBlacklist)
RequestExecutors(resourceProfileToTotalExecs, numLocalityAwareTasksPerResourceProfileId,
filteredRPHostToLocalTaskCount, nodeBlacklist)
}
/**

View file

@ -17,10 +17,13 @@
package org.apache.spark.deploy.yarn
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.SparkConf
import org.apache.spark.SparkFunSuite
import org.apache.spark.resource.ResourceProfile
class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
@ -28,7 +31,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
import yarnAllocatorSuite._
def createContainerRequest(nodes: Array[String]): ContainerRequest =
new ContainerRequest(containerResource, nodes, null, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
new ContainerRequest(containerResource, nodes, null, Priority.newInstance(1))
override def beforeEach(): Unit = {
yarnAllocatorSuite.beforeEach()
@ -38,18 +41,22 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
yarnAllocatorSuite.afterEach()
}
val defaultResourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
test("allocate locality preferred containers with enough resource and no matched existed " +
"containers") {
// 1. All the locations of current containers cannot satisfy the new requirements
// 2. Current requested container number can fully satisfy the pending tasks.
val handler = createAllocator(2)
val (handler, allocatorConf) = createAllocator(2)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
ResourceProfile.clearDefaultProfile
val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10),
handler.allocatedHostToContainersMap, Seq.empty)
handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp)
assert(localities.map(_.nodes) === Array(
Array("host3", "host4", "host5"),
@ -62,7 +69,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
// 1. Parts of current containers' locations can satisfy the new requirements
// 2. Current requested container number can fully satisfy the pending tasks.
val handler = createAllocator(3)
val (handler, allocatorConf) = createAllocator(3)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(
createContainer("host1"),
@ -70,9 +77,12 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
createContainer("host2")
))
ResourceProfile.clearDefaultProfile
val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
handler.allocatedHostToContainersMap, Seq.empty)
handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp)
assert(localities.map(_.nodes) ===
Array(null, Array("host2", "host3"), Array("host2", "host3")))
@ -83,7 +93,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
// 1. Parts of current containers' locations can satisfy the new requirements
// 2. Current requested container number cannot fully satisfy the pending tasks.
val handler = createAllocator(3)
val (handler, allocatorConf) = createAllocator(3)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(
createContainer("host1"),
@ -91,9 +101,11 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
createContainer("host2")
))
ResourceProfile.clearDefaultProfile
val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
handler.allocatedHostToContainersMap, Seq.empty)
handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp)
assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
}
@ -101,7 +113,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
test("allocate locality preferred containers with fully matched containers") {
// Current containers' locations can fully satisfy the new requirements
val handler = createAllocator(5)
val (handler, allocatorConf) = createAllocator(5)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(
createContainer("host1"),
@ -111,9 +123,11 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
createContainer("host3")
))
ResourceProfile.clearDefaultProfile
val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
handler.allocatedHostToContainersMap, Seq.empty)
handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp)
assert(localities.map(_.nodes) === Array(null, null, null))
}
@ -121,18 +135,21 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
test("allocate containers with no locality preference") {
// Request new container without locality preference
val handler = createAllocator(2)
val (handler, allocatorConf) = createAllocator(2)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
ResourceProfile.clearDefaultProfile
val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
1, 0, Map.empty, handler.allocatedHostToContainersMap, Seq.empty)
1, 0, Map.empty,
handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp)
assert(localities.map(_.nodes) === Array(null))
}
test("allocate locality preferred containers by considering the localities of pending requests") {
val handler = createAllocator(3)
val (handler, allocatorConf) = createAllocator(3)
handler.updateResourceRequests()
handler.handleAllocatedContainers(Array(
createContainer("host1"),
@ -144,9 +161,12 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B
createContainerRequest(Array("host2", "host3")),
createContainerRequest(Array("host1", "host4")))
ResourceProfile.clearDefaultProfile
val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf)
val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
handler.allocatedHostToContainersMap, pendingAllocationRequests)
handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId),
pendingAllocationRequests, rp)
assert(localities.map(_.nodes) === Array(Array("host3")))
}

View file

@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Mockito._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.resource.ResourceProfile
class LocalityPlacementStrategySuite extends SparkFunSuite {
@ -58,7 +59,7 @@ class LocalityPlacementStrategySuite extends SparkFunSuite {
val resource = Resource.newInstance(8 * 1024, 4)
val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(),
yarnConf, resource, new MockResolver())
yarnConf, new MockResolver())
val totalTasks = 32 * 1024
val totalContainers = totalTasks / 16
@ -75,9 +76,10 @@ class LocalityPlacementStrategySuite extends SparkFunSuite {
containers.drop(count * i).take(i).foreach { c => hostContainers += c }
hostToContainerMap(host) = hostContainers
}
val rp = ResourceProfile.getOrCreateDefaultProfile(new SparkConf)
strategy.localityOfRequestedContainers(containers.size * 2, totalTasks, hosts,
hostToContainerMap, Nil)
hostToContainerMap, Nil, rp)
}
}

View file

@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
@ -32,9 +33,9 @@ import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config._
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests}
import org.apache.spark.resource.ResourceUtils.{AMOUNT, GPU}
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.rpc.RpcEndpointRef
@ -69,6 +70,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
var containerNum = 0
// priority has to be 0 to match default profile id
val RM_REQUEST_PRIORITY = Priority.newInstance(0)
val defaultRPId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
val defaultRP = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
override def beforeEach(): Unit = {
super.beforeEach()
rmClient = AMRMClient.createAMRMClient()
@ -93,7 +99,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
def createAllocator(
maxExecutors: Int = 5,
rmClient: AMRMClient[ContainerRequest] = rmClient,
additionalConfigs: Map[String, String] = Map()): YarnAllocator = {
additionalConfigs: Map[String, String] = Map()): (YarnAllocator, SparkConf) = {
val args = Array(
"--jar", "somejar.jar",
"--class", "SomeClass")
@ -107,7 +113,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
sparkConfClone.set(name, value)
}
new YarnAllocator(
val allocator = new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
conf,
@ -118,16 +124,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
Map(),
new MockResolver(),
clock)
(allocator, sparkConfClone)
}
def createContainer(
host: String,
containerNumber: Int = containerNum,
resource: Resource = containerResource): Container = {
resource: Resource = containerResource,
priority: Priority = RM_REQUEST_PRIORITY): Container = {
val containerId: ContainerId = ContainerId.newContainerId(appAttemptId, containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)
Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null)
Container.newInstance(containerId, nodeId, "", resource, priority, null)
}
def createContainers(hosts: Seq[String], containerIds: Seq[Int]): Seq[Container] = {
@ -145,37 +153,125 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
test("single container allocated") {
// request a single container and receive it
val handler = createAllocator(1)
val (handler, _) = createAllocator(1)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (1)
handler.getNumContainersPendingAllocate should be (1)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
hostTocontainer.get("host1").get should contain(container.getId)
val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size
size should be (0)
}
test("single container allocated with ResourceProfile") {
assume(isYarnResourceTypesAvailable())
val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG)
ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
// create default profile so we get a different id to test below
val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
val execReq = new ExecutorResourceRequests().resource("gpu", 6)
val taskReq = new TaskResourceRequests().resource("gpu", 1)
val rprof = new ResourceProfile(execReq.requests, taskReq.requests)
// request a single container and receive it
val (handler, _) = createAllocator(0)
val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof -> 1)
val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id -> 0)
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumContainersPendingAllocate should be (1)
val container = createContainer("host1", priority = Priority.newInstance(rprof.id))
handler.handleAllocatedContainers(Array(container))
handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id)
hostTocontainer.get("host1").get should contain(container.getId)
val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size
size should be (0)
ResourceProfile.reInitDefaultProfile(sparkConf)
}
test("multiple containers allocated with ResourceProfiles") {
assume(isYarnResourceTypesAvailable())
val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG)
ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
// create default profile so we get a different id to test below
val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
val execReq = new ExecutorResourceRequests().resource("gpu", 6)
val taskReq = new TaskResourceRequests().resource("gpu", 1)
val rprof = new ResourceProfile(execReq.requests, taskReq.requests)
val execReq2 = new ExecutorResourceRequests().memory("8g").resource("fpga", 2)
val taskReq2 = new TaskResourceRequests().resource("fpga", 1)
val rprof2 = new ResourceProfile(execReq2.requests, taskReq2.requests)
// request a single container and receive it
val (handler, _) = createAllocator(1)
val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof -> 1, rprof2 -> 2)
val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id -> 0, rprof2.id -> 0)
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumContainersPendingAllocate should be (3)
val containerResourcerp2 = Resource.newInstance(10240, 5)
val container = createContainer("host1", priority = Priority.newInstance(rprof.id))
val container2 = createContainer("host2", resource = containerResourcerp2,
priority = Priority.newInstance(rprof2.id))
val container3 = createContainer("host3", resource = containerResourcerp2,
priority = Priority.newInstance(rprof2.id))
handler.handleAllocatedContainers(Array(container, container2, container3))
handler.getNumExecutorsRunning should be (3)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2")
handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host3")
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id)
hostTocontainer.get("host1").get should contain(container.getId)
val hostTocontainer2 = handler.allocatedHostToContainersMapPerRPId(rprof2.id)
hostTocontainer2.get("host2").get should contain(container2.getId)
hostTocontainer2.get("host3").get should contain(container3.getId)
val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size
size should be (0)
ResourceProfile.reInitDefaultProfile(sparkConf)
}
test("custom resource requested from yarn") {
assume(isYarnResourceTypesAvailable())
ResourceRequestTestHelper.initializeResourceTypes(List("gpu"))
val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
val handler = createAllocator(1, mockAmClient,
val (handler, _) = createAllocator(1, mockAmClient,
Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G"))
handler.updateResourceRequests()
val container = createContainer("host1", resource = handler.resource)
val container = createContainer("host1", resource = handler.defaultResource)
handler.handleAllocatedContainers(Array(container))
// get amount of memory and vcores from resource, so effectively skipping their validation
val expectedResources = Resource.newInstance(handler.resource.getMemory(),
handler.resource.getVirtualCores)
val expectedResources = Resource.newInstance(handler.defaultResource.getMemory(),
handler.defaultResource.getVirtualCores)
setResourceRequests(Map("gpu" -> "2G"), expectedResources)
val captor = ArgumentCaptor.forClass(classOf[ContainerRequest])
@ -195,10 +291,10 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
Map(EXECUTOR_GPU_ID.amountConf -> "3",
EXECUTOR_FPGA_ID.amountConf -> "2",
madeupConfigName -> "5")
val handler = createAllocator(1, mockAmClient, sparkResources)
val (handler, _) = createAllocator(1, mockAmClient, sparkResources)
handler.updateResourceRequests()
val yarnRInfo = ResourceRequestTestHelper.getResources(handler.resource)
val yarnRInfo = ResourceRequestTestHelper.getResources(handler.defaultResource)
val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) ).toMap
assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty)
assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3)
@ -210,17 +306,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
test("container should not be created if requested number if met") {
// request a single container and receive it
val handler = createAllocator(1)
val (handler, _) = createAllocator(1)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (1)
handler.getNumContainersPendingAllocate should be (1)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
hostTocontainer.get("host1").get should contain(container.getId)
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container2))
@ -229,10 +326,10 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
test("some containers allocated") {
// request a few containers and receive some of them
val handler = createAllocator(4)
val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (4)
handler.getNumContainersPendingAllocate should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host1")
@ -243,16 +340,17 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host1")
handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host2")
handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
handler.allocatedHostToContainersMap.get("host1").get should contain (container2.getId)
handler.allocatedHostToContainersMap.get("host2").get should contain (container3.getId)
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
hostTocontainer.get("host1").get should contain(container1.getId)
hostTocontainer.get("host1").get should contain (container2.getId)
hostTocontainer.get("host2").get should contain (container3.getId)
}
test("receive more containers than requested") {
val handler = createAllocator(2)
val (handler, _) = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (2)
handler.getNumContainersPendingAllocate should be (2)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@ -263,42 +361,52 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2")
handler.allocatedContainerToHostMap.contains(container3.getId) should be (false)
handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId)
handler.allocatedHostToContainersMap.contains("host4") should be (false)
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
hostTocontainer.get("host1").get should contain(container1.getId)
hostTocontainer.get("host2").get should contain (container2.getId)
hostTocontainer.contains("host4") should be (false)
}
test("decrease total requested executors") {
val handler = createAllocator(4)
val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (4)
handler.getNumContainersPendingAllocate should be (4)
handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty)
val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 3)
val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (3)
handler.getNumContainersPendingAllocate should be (3)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
hostTocontainer.get("host1").get should contain(container.getId)
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty)
resourceProfileToTotalExecs(defaultRP) = 2
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (1)
handler.getNumContainersPendingAllocate should be (1)
}
test("decrease total requested executors to less than currently running") {
val handler = createAllocator(4)
val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (4)
handler.getNumContainersPendingAllocate should be (4)
handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty)
val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 3)
val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (3)
handler.getNumContainersPendingAllocate should be (3)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
@ -306,23 +414,28 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumExecutorsRunning should be (2)
handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty)
resourceProfileToTotalExecs(defaultRP) = 1
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (0)
handler.getNumContainersPendingAllocate should be (0)
handler.getNumExecutorsRunning should be (2)
}
test("kill executors") {
val handler = createAllocator(4)
val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (4)
handler.getNumContainersPendingAllocate should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty)
val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 1)
val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) }
val statuses = Seq(container1, container2).map { c =>
@ -331,20 +444,20 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.updateResourceRequests()
handler.processCompletedContainers(statuses)
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (1)
handler.getNumContainersPendingAllocate should be (1)
}
test("kill same executor multiple times") {
val handler = createAllocator(2)
val (handler, _) = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (2)
handler.getNumContainersPendingAllocate should be (2)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.getNumExecutorsRunning should be (2)
handler.getPendingAllocate.size should be (0)
handler.getNumContainersPendingAllocate should be (0)
val executorToKill = handler.executorIdToContainer.keys.head
handler.killExecutor(executorToKill)
@ -353,22 +466,25 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.killExecutor(executorToKill)
handler.killExecutor(executorToKill)
handler.getNumExecutorsRunning should be (1)
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty)
val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 2)
val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (1)
handler.getNumContainersPendingAllocate should be (1)
}
test("process same completed container multiple times") {
val handler = createAllocator(2)
val (handler, _) = createAllocator(2)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (2)
handler.getNumContainersPendingAllocate should be (2)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.getNumExecutorsRunning should be (2)
handler.getPendingAllocate.size should be (0)
handler.getNumContainersPendingAllocate should be (0)
val statuses = Seq(container1, container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
@ -379,16 +495,19 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
}
test("lost executor removed from backend") {
val handler = createAllocator(4)
val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (4)
handler.getNumContainersPendingAllocate should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Set.empty)
val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 2)
val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set.empty)
val statuses = Seq(container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
@ -397,7 +516,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.processCompletedContainers(statuses)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (2)
handler.getNumContainersPendingAllocate should be (2)
handler.getNumExecutorsFailed should be (2)
handler.getNumUnexpectedContainerRelease should be (2)
}
@ -406,28 +525,35 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
// Internally we track the set of blacklisted nodes, but yarn wants us to send *changes*
// to the blacklist. This makes sure we are sending the right updates.
val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
val handler = createAllocator(4, mockAmClient)
handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set("hostA"))
val (handler, _) = createAllocator(4, mockAmClient)
val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 1)
val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set("hostA"))
verify(mockAmClient).updateBlacklist(Seq("hostA").asJava, Seq[String]().asJava)
val blacklistedNodes = Set(
"hostA",
"hostB"
)
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), blacklistedNodes)
verify(mockAmClient).updateBlacklist(Seq("hostB").asJava, Seq[String]().asJava)
handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set.empty)
resourceProfileToTotalExecs(defaultRP) = 2
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map(), blacklistedNodes)
verify(mockAmClient).updateBlacklist(Seq("hostB").asJava, Seq[String]().asJava)
resourceProfileToTotalExecs(defaultRP) = 3
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set.empty)
verify(mockAmClient).updateBlacklist(Seq[String]().asJava, Seq("hostA", "hostB").asJava)
}
test("window based failure executor counting") {
sparkConf.set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS, 100 * 1000L)
val handler = createAllocator(4)
val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (4)
handler.getNumContainersPendingAllocate should be (4)
val containers = Seq(
createContainer("host1"),
@ -468,7 +594,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val rmClientSpy = spy(rmClient)
val maxExecutors = 11
val handler = createAllocator(
val (handler, _) = createAllocator(
maxExecutors,
rmClientSpy,
Map(
@ -525,9 +651,9 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
try {
sparkConf.set(MEMORY_OFFHEAP_ENABLED, true)
sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte)
val allocator = createAllocator(maxExecutors = 1,
val (handler, _) = createAllocator(maxExecutors = 1,
additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString))
val memory = allocator.resource.getMemory
val memory = handler.defaultResource.getMemory
assert(memory ==
executorMemory + offHeapMemoryInMB + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)
} finally {

View file

@ -51,9 +51,8 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc
private class TestYarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
extends YarnSchedulerBackend(scheduler, sc) {
def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = {
this.rpHostToLocalTaskCount = Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID ->
hostToLocalTaskCount)
def setHostToLocalTaskCount(hostToLocalTaskCount: Map[Int, Map[String, Int]]): Unit = {
this.rpHostToLocalTaskCount = hostToLocalTaskCount
}
}
@ -64,21 +63,24 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc
val yarnSchedulerBackendExtended = new TestYarnSchedulerBackend(sched, sc)
yarnSchedulerBackend = yarnSchedulerBackendExtended
val ser = new JavaSerializer(sc.conf).newInstance()
val defaultResourceProf = ResourceProfile.getOrCreateDefaultProfile(sc.getConf)
for {
blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c"))
numRequested <- 0 until 10
hostToLocalCount <- IndexedSeq(
Map[String, Int](),
Map("a" -> 1, "b" -> 2)
Map(defaultResourceProf.id -> Map.empty[String, Int]),
Map(defaultResourceProf.id -> Map("a" -> 1, "b" -> 2))
)
} {
yarnSchedulerBackendExtended.setHostToLocalTaskCount(hostToLocalCount)
sched.setNodeBlacklist(blacklist)
val numReq = Map(ResourceProfile.getOrCreateDefaultProfile(sc.getConf) -> numRequested)
val req = yarnSchedulerBackendExtended.prepareRequestExecutors(numReq)
assert(req.requestedTotal === numRequested)
val request = Map(defaultResourceProf -> numRequested)
val req = yarnSchedulerBackendExtended.prepareRequestExecutors(request)
assert(req.resourceProfileToTotalExecs(defaultResourceProf) === numRequested)
assert(req.nodeBlacklist === blacklist)
assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty)
val hosts =
req.hostToLocalTaskCount(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID).keySet
assert(hosts.intersect(blacklist).isEmpty)
// Serialize to make sure serialization doesn't throw an error
ser.serialize(req)
}