diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 8db0122f17..465c0d20de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -117,9 +117,9 @@ private[spark] object CoarseGrainedClusterMessages { // Request executors by specifying the new total number of executors desired // This includes executors already pending or running case class RequestExecutors( - requestedTotal: Int, - localityAwareTasks: Int, - hostToLocalTaskCount: Map[String, Int], + resourceProfileToTotalExecs: Map[ResourceProfile, Int], + numLocalityAwareTasksPerResourceProfileId: Map[Int, Int], + hostToLocalTaskCount: Map[Int, Map[String, Int]], nodeBlacklist: Set[String]) extends CoarseGrainedClusterMessage diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index a9296955d1..312691302b 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -30,7 +30,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING -import org.apache.spark.resource.ResourceProfile +import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager} import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -61,6 +61,7 @@ class HeartbeatReceiverSuite PrivateMethod[collection.Map[String, Long]](Symbol("executorLastSeen")) private val _executorTimeoutMs = PrivateMethod[Long](Symbol("executorTimeoutMs")) private val _killExecutorThread = PrivateMethod[ExecutorService](Symbol("killExecutorThread")) + var conf: SparkConf = _ /** * Before each test, set up the SparkContext and a custom [[HeartbeatReceiver]] @@ -68,7 +69,7 @@ class HeartbeatReceiverSuite */ override def beforeEach(): Unit = { super.beforeEach() - val conf = new SparkConf() + conf = new SparkConf() .setMaster("local[2]") .setAppName("test") .set(DYN_ALLOCATION_TESTING, true) @@ -76,7 +77,6 @@ class HeartbeatReceiverSuite scheduler = mock(classOf[TaskSchedulerImpl]) when(sc.taskScheduler).thenReturn(scheduler) when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]()) - when(scheduler.resourcesReqsPerTask).thenReturn(Seq.empty) when(scheduler.sc).thenReturn(sc) heartbeatReceiverClock = new ManualClock heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock) @@ -164,9 +164,10 @@ class HeartbeatReceiverSuite test("expire dead hosts should kill executors with replacement (SPARK-8119)") { // Set up a fake backend and cluster manager to simulate killing executors val rpcEnv = sc.env.rpcEnv - val fakeClusterManager = new FakeClusterManager(rpcEnv) + val fakeClusterManager = new FakeClusterManager(rpcEnv, conf) val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager) - val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef) + val fakeSchedulerBackend = + new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef, sc.resourceProfileManager) when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend) // Register fake executors with our fake scheduler backend @@ -282,18 +283,16 @@ private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpo private class FakeSchedulerBackend( scheduler: TaskSchedulerImpl, rpcEnv: RpcEnv, - clusterManagerEndpoint: RpcEndpointRef) + clusterManagerEndpoint: RpcEndpointRef, + resourceProfileManager: ResourceProfileManager) extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { protected override def doRequestTotalExecutors( resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = { clusterManagerEndpoint.ask[Boolean]( - RequestExecutors( - resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf)), - numLocalityAwareTasksPerResourceProfileId(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID), - rpHostToLocalTaskCount(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID), - Set.empty)) - } + RequestExecutors(resourceProfileToTotalExecs, numLocalityAwareTasksPerResourceProfileId, + rpHostToLocalTaskCount, Set.empty)) +} protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds)) @@ -303,7 +302,7 @@ private class FakeSchedulerBackend( /** * Dummy cluster manager to simulate responses to executor allocation requests. */ -private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoint { +private class FakeClusterManager(override val rpcEnv: RpcEnv, conf: SparkConf) extends RpcEndpoint { private var targetNumExecutors = 0 private val executorIdsToKill = new mutable.HashSet[String] @@ -311,8 +310,9 @@ private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoin def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RequestExecutors(requestedTotal, _, _, _) => - targetNumExecutors = requestedTotal + case RequestExecutors(resourceProfileToTotalExecs, _, _, _) => + targetNumExecutors = + resourceProfileToTotalExecs(ResourceProfile.getOrCreateDefaultProfile(conf)) context.reply(true) case KillExecutors(executorIds) => executorIdsToKill ++= executorIds diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1e8f4084ef..43cd7458ef 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -593,7 +593,7 @@ private[spark] class ApplicationMaster( } } try { - val numPendingAllocate = allocator.getPendingAllocate.size + val numPendingAllocate = allocator.getNumContainersPendingAllocate var sleepStartNs = 0L var sleepInterval = 200L // ms allocatorLock.synchronized { @@ -778,8 +778,11 @@ private[spark] class ApplicationMaster( case r: RequestExecutors => Option(allocator) match { case Some(a) => - if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal, - r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) { + if (a.requestTotalExecutorsWithPreferredLocalities( + r.resourceProfileToTotalExecs, + r.numLocalityAwareTasksPerResourceProfileId, + r.hostToLocalTaskCount, + r.nodeBlacklist)) { resetAllocatorInterval() } context.reply(true) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala index 0fec916582..62ac17cff1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterSource.scala @@ -40,11 +40,11 @@ private[spark] class ApplicationMasterSource(prefix: String, yarnAllocator: Yarn }) metricRegistry.register(MetricRegistry.name("numLocalityAwareTasks"), new Gauge[Int] { - override def getValue: Int = yarnAllocator.numLocalityAwareTasks + override def getValue: Int = yarnAllocator.getNumLocalityAwareTasks }) metricRegistry.register(MetricRegistry.name("numContainersPendingAllocate"), new Gauge[Int] { - override def getValue: Int = yarnAllocator.numContainersPendingAllocate + override def getValue: Int = yarnAllocator.getNumContainersPendingAllocate }) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala index 2288bb55d8..a6380abbaa 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.records.{ContainerId, Resource} import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.spark.SparkConf -import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceProfile private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) @@ -82,7 +82,6 @@ private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], rack private[yarn] class LocalityPreferredContainerPlacementStrategy( val sparkConf: SparkConf, val yarnConf: Configuration, - val resource: Resource, resolver: SparkRackResolver) { /** @@ -96,6 +95,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( * containers * @param localityMatchedPendingAllocations A sequence of pending container request which * matches the localities of current required tasks. + * @param rp The ResourceProfile associated with this container. * @return node localities and rack localities, each locality is an array of string, * the length of localities is the same as number of containers */ @@ -104,11 +104,12 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( numLocalityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], allocatedHostToContainersMap: HashMap[String, Set[ContainerId]], - localityMatchedPendingAllocations: Seq[ContainerRequest] + localityMatchedPendingAllocations: Seq[ContainerRequest], + rp: ResourceProfile ): Array[ContainerLocalityPreferences] = { val updatedHostToContainerCount = expectedHostToContainerCount( numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap, - localityMatchedPendingAllocations) + localityMatchedPendingAllocations, rp) val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum // The number of containers to allocate, divided into two groups, one with preferred locality, @@ -152,11 +153,14 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( } /** - * Calculate the number of executors need to satisfy the given number of pending tasks. + * Calculate the number of executors needed to satisfy the given number of pending tasks for + * the ResourceProfile. */ - private def numExecutorsPending(numTasksPending: Int): Int = { - val coresPerExecutor = resource.getVirtualCores - (numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor + private def numExecutorsPending( + numTasksPending: Int, + rp: ResourceProfile): Int = { + val tasksPerExec = rp.maxTasksPerExecutor(sparkConf) + math.ceil(numTasksPending / tasksPerExec.toDouble).toInt } /** @@ -175,14 +179,15 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], allocatedHostToContainersMap: HashMap[String, Set[ContainerId]], - localityMatchedPendingAllocations: Seq[ContainerRequest] + localityMatchedPendingAllocations: Seq[ContainerRequest], + rp: ResourceProfile ): Map[String, Int] = { val totalLocalTaskNum = hostToLocalTaskCount.values.sum val pendingHostToContainersMap = pendingHostToContainerCount(localityMatchedPendingAllocations) hostToLocalTaskCount.map { case (host, count) => val expectedCount = - count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum + count.toDouble * numExecutorsPending(localityAwareTasks, rp) / totalLocalTaskNum // Take the locality of pending containers into consideration val existedCount = allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) + pendingHostToContainersMap.getOrElse(host, 0.0) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala index ae316b02ee..3d800be9e2 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala @@ -227,6 +227,17 @@ private object ResourceRequestHelper extends Logging { resourceInformation } + def isYarnCustomResourcesNonEmpty(resource: Resource): Boolean = { + try { + // Use reflection as this uses APIs only available in Hadoop 3 + val getResourcesMethod = resource.getClass().getMethod("getResources") + val resources = getResourcesMethod.invoke(resource).asInstanceOf[Array[Any]] + if (resources.nonEmpty) true else false + } catch { + case _: NoSuchMethodException => false + } + } + /** * Checks whether Hadoop 2.x or 3 is used as a dependency. * In case of Hadoop 3 and later, the ResourceInformation class diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 09414cbbe5..cd0e7d5c87 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -17,9 +17,9 @@ package org.apache.spark.deploy.yarn -import java.util.Collections -import java.util.concurrent._ +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger +import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable @@ -39,6 +39,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.resource.ResourceProfile +import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor @@ -75,19 +76,69 @@ private[yarn] class YarnAllocator( import YarnAllocator._ // Visible for testing. - val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]] + @GuardedBy("this") + val allocatedHostToContainersMapPerRPId = + new HashMap[Int, HashMap[String, collection.mutable.Set[ContainerId]]] + + @GuardedBy("this") val allocatedContainerToHostMap = new HashMap[ContainerId, String] // Containers that we no longer care about. We've either already told the RM to release them or // will on the next heartbeat. Containers get removed from this map after the RM tells us they've // completed. - private val releasedContainers = Collections.newSetFromMap[ContainerId]( - new ConcurrentHashMap[ContainerId, java.lang.Boolean]) + @GuardedBy("this") + private val releasedContainers = collection.mutable.HashSet[ContainerId]() - private val runningExecutors = Collections.newSetFromMap[String]( - new ConcurrentHashMap[String, java.lang.Boolean]()) + @GuardedBy("this") + private val runningExecutorsPerResourceProfileId = new HashMap[Int, mutable.Set[String]]() - private val numExecutorsStarting = new AtomicInteger(0) + @GuardedBy("this") + private val numExecutorsStartingPerResourceProfileId = new HashMap[Int, AtomicInteger] + + @GuardedBy("this") + private val targetNumExecutorsPerResourceProfileId = new mutable.HashMap[Int, Int] + + // Executor loss reason requests that are pending - maps from executor ID for inquiry to a + // list of requesters that should be responded to once we find out why the given executor + // was lost. + @GuardedBy("this") + private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]] + + // Maintain loss reasons for already released executors, it will be added when executor loss + // reason is got from AM-RM call, and be removed after querying this loss reason. + @GuardedBy("this") + private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason] + + // Keep track of which container is running which executor to remove the executors later + // Visible for testing. + @GuardedBy("this") + private[yarn] val executorIdToContainer = new HashMap[String, Container] + + @GuardedBy("this") + private var numUnexpectedContainerRelease = 0L + + @GuardedBy("this") + private val containerIdToExecutorIdAndResourceProfileId = new HashMap[ContainerId, (String, Int)] + + // Use a ConcurrentHashMap because this is used in matchContainerToRequest, which is called + // from the rack resolver thread where synchronize(this) on this would cause a deadlock. + @GuardedBy("ConcurrentHashMap") + private[yarn] val rpIdToYarnResource = new ConcurrentHashMap[Int, Resource]() + + // note currently we don't remove ResourceProfiles + @GuardedBy("this") + private[yarn] val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile] + + // A map of ResourceProfile id to a map of preferred hostname and possible + // task numbers running on it. + @GuardedBy("this") + private var hostToLocalTaskCountPerResourceProfileId: Map[Int, Map[String, Int]] = + Map(DEFAULT_RESOURCE_PROFILE_ID -> Map.empty) + + // ResourceProfile Id to number of tasks that have locality preferences in active stages + @GuardedBy("this") + private[yarn] var numLocalityAwareTasksPerResourceProfileId: Map[Int, Int] = + Map(DEFAULT_RESOURCE_PROFILE_ID -> 0) /** * Used to generate a unique ID per executor @@ -102,6 +153,7 @@ private[yarn] class YarnAllocator( * * @see SPARK-12864 */ + @GuardedBy("this") private var executorIdCounter: Int = driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) @@ -110,26 +162,6 @@ private[yarn] class YarnAllocator( private val allocatorBlacklistTracker = new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker) - @volatile private var targetNumExecutors = - SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf) - - - // Executor loss reason requests that are pending - maps from executor ID for inquiry to a - // list of requesters that should be responded to once we find out why the given executor - // was lost. - private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]] - - // Maintain loss reasons for already released executors, it will be added when executor loss - // reason is got from AM-RM call, and be removed after querying this loss reason. - private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason] - - // Keep track of which container is running which executor to remove the executors later - // Visible for testing. - private[yarn] val executorIdToContainer = new HashMap[String, Container] - - private var numUnexpectedContainerRelease = 0L - private val containerIdToExecutorId = new HashMap[ContainerId, String] - // Executor memory in MiB. protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt // Executor offHeap memory in MiB. @@ -142,17 +174,18 @@ private[yarn] class YarnAllocator( } else { 0 } - // Number of cores per executor. - protected val executorCores = sparkConf.get(EXECUTOR_CORES) + // Number of cores per executor for the default profile + protected val defaultExecutorCores = sparkConf.get(EXECUTOR_CORES) private val executorResourceRequests = getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++ getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf) - // Resource capability requested for each executor - private[yarn] val resource: Resource = { - val resource = Resource.newInstance( - executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores) + // Resource capability requested for each executor for the default profile + private[yarn] val defaultResource: Resource = { + val resource: Resource = Resource.newInstance( + executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, + defaultExecutorCores) ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource) logDebug(s"Created resource capability: $resource") resource @@ -166,19 +199,42 @@ private[yarn] class YarnAllocator( private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION) - // A map to store preferred hostname and possible task numbers running on it. - private var hostToLocalTaskCounts: Map[String, Int] = Map.empty - - // Number of tasks that have locality preferences in active stages - private[yarn] var numLocalityAwareTasks: Int = 0 - // A container placement strategy based on pending tasks' locality preference private[yarn] val containerPlacementStrategy = - new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver) + new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resolver) - def getNumExecutorsRunning: Int = runningExecutors.size() + // The default profile is always present so we need to initialize the datastructures keyed by + // ResourceProfile id to ensure its present if things start running before a request for + // executors could add it. This approach is easier then going and special casing everywhere. + private def initDefaultProfile(): Unit = synchronized { + allocatedHostToContainersMapPerRPId(DEFAULT_RESOURCE_PROFILE_ID) = + new HashMap[String, mutable.Set[ContainerId]]() + runningExecutorsPerResourceProfileId.put(DEFAULT_RESOURCE_PROFILE_ID, mutable.HashSet[String]()) + numExecutorsStartingPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = new AtomicInteger(0) + targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = + SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf) + rpIdToYarnResource.put(DEFAULT_RESOURCE_PROFILE_ID, defaultResource) + rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) = + ResourceProfile.getOrCreateDefaultProfile(sparkConf) + } - def getNumReleasedContainers: Int = releasedContainers.size() + initDefaultProfile() + + def getNumExecutorsRunning: Int = synchronized { + runningExecutorsPerResourceProfileId.values.map(_.size).sum + } + + def getNumLocalityAwareTasks: Int = synchronized { + numLocalityAwareTasksPerResourceProfileId.values.sum + } + + def getNumExecutorsStarting: Int = synchronized { + numExecutorsStartingPerResourceProfileId.values.map(_.get()).sum + } + + def getNumReleasedContainers: Int = synchronized { + releasedContainers.size + } def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors @@ -186,49 +242,147 @@ private[yarn] class YarnAllocator( /** * A sequence of pending container requests that have not yet been fulfilled. + * ResourceProfile id -> pendingAllocate container request */ - def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST) + def getPendingAllocate: Map[Int, Seq[ContainerRequest]] = getPendingAtLocation(ANY_HOST) - def numContainersPendingAllocate: Int = synchronized { - getPendingAllocate.size + def getNumContainersPendingAllocate: Int = synchronized { + getPendingAllocate.values.flatten.size + } + + // YARN priorities are such that lower number is higher priority. + // We need to allocate a different priority for each ResourceProfile because YARN + // won't allow different container resource requirements within a Priority. + // We could allocate per Stage to make sure earlier stages get priority but Spark + // always finishes a stage before starting a later one and if we have 2 running in parallel + // the priority doesn't matter. + // We are using the ResourceProfile id as the priority. + private def getContainerPriority(rpId: Int): Priority = { + Priority.newInstance(rpId) + } + + // The ResourceProfile id is the priority + private def getResourceProfileIdFromPriority(priority: Priority): Int = { + priority.getPriority() + } + + private def getOrUpdateAllocatedHostToContainersMapForRPId( + rpId: Int): HashMap[String, collection.mutable.Set[ContainerId]] = synchronized { + allocatedHostToContainersMapPerRPId.getOrElseUpdate(rpId, + new HashMap[String, mutable.Set[ContainerId]]()) + } + + private def getOrUpdateRunningExecutorForRPId(rpId: Int): mutable.Set[String] = synchronized { + runningExecutorsPerResourceProfileId.getOrElseUpdate(rpId, mutable.HashSet[String]()) + } + + private def getOrUpdateNumExecutorsStartingForRPId(rpId: Int): AtomicInteger = synchronized { + numExecutorsStartingPerResourceProfileId.getOrElseUpdate(rpId, new AtomicInteger(0)) + } + + private def getOrUpdateTargetNumExecutorsForRPId(rpId: Int): Int = synchronized { + targetNumExecutorsPerResourceProfileId.getOrElseUpdate(rpId, + SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)) } /** - * A sequence of pending container requests at the given location that have not yet been - * fulfilled. + * A sequence of pending container requests at the given location for each ResourceProfile id + * that have not yet been fulfilled. */ - private def getPendingAtLocation(location: String): Seq[ContainerRequest] = - amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala - .flatMap(_.asScala) + private def getPendingAtLocation( + location: String): Map[Int, Seq[ContainerRequest]] = synchronized { + val allContainerRequests = new mutable.HashMap[Int, Seq[ContainerRequest]] + rpIdToResourceProfile.keys.map { id => + val profResource = rpIdToYarnResource.get(id) + val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource) + .asScala.flatMap(_.asScala) + allContainerRequests(id) = result + } + allContainerRequests.toMap + } + + // if a ResourceProfile hasn't been seen yet, create the corresponding YARN Resource for it + private def createYarnResourceForResourceProfile( + resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = synchronized { + resourceProfileToTotalExecs.foreach { case (rp, num) => + if (!rpIdToYarnResource.contains(rp.id)) { + // Start with the application or default settings + var heapMem = executorMemory.toLong + // Note we currently don't support off heap memory in ResourceProfile - SPARK-30794 + var offHeapMem = executorOffHeapMemory.toLong + var overheadMem = memoryOverhead.toLong + var pysparkMem = pysparkWorkerMemory.toLong + var cores = defaultExecutorCores + val customResources = new mutable.HashMap[String, String] + // track the resource profile if not already there + getOrUpdateRunningExecutorForRPId(rp.id) + logInfo(s"Resource profile ${rp.id} doesn't exist, adding it") + val execResources = rp.executorResources + execResources.foreach { case (r, execReq) => + r match { + case ResourceProfile.MEMORY => + heapMem = execReq.amount + case ResourceProfile.OVERHEAD_MEM => + overheadMem = execReq.amount + case ResourceProfile.PYSPARK_MEM => + pysparkMem = execReq.amount + case ResourceProfile.CORES => + cores = execReq.amount.toInt + case "gpu" => + customResources(YARN_GPU_RESOURCE_CONFIG) = execReq.amount.toString + case "fpga" => + customResources(YARN_FPGA_RESOURCE_CONFIG) = execReq.amount.toString + case rName => + customResources(rName) = execReq.amount.toString + } + } + val totalMem = (heapMem + offHeapMem + overheadMem + pysparkMem).toInt + val resource = Resource.newInstance(totalMem, cores) + ResourceRequestHelper.setResourceRequests(customResources.toMap, resource) + logDebug(s"Created resource capability: $resource") + rpIdToYarnResource.putIfAbsent(rp.id, resource) + rpIdToResourceProfile(rp.id) = rp + } + } + } /** * Request as many executors from the ResourceManager as needed to reach the desired total. If * the requested total is smaller than the current number of running executors, no executors will * be killed. - * @param requestedTotal total number of containers requested - * @param localityAwareTasks number of locality aware tasks to be used as container placement hint - * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as - * container placement hint. + * @param resourceProfileToTotalExecs total number of containers requested for each + * ResourceProfile + * @param numLocalityAwareTasksPerResourceProfileId number of locality aware tasks for each + * ResourceProfile id to be used as container + * placement hint. + * @param hostToLocalTaskCount a map of preferred hostname to possible task counts for each + * ResourceProfile id to be used as container placement hint. * @param nodeBlacklist blacklisted nodes, which is passed in to avoid allocating new containers * on them. It will be used to update the application master's blacklist. * @return Whether the new requested total is different than the old value. */ def requestTotalExecutorsWithPreferredLocalities( - requestedTotal: Int, - localityAwareTasks: Int, - hostToLocalTaskCount: Map[String, Int], + resourceProfileToTotalExecs: Map[ResourceProfile, Int], + numLocalityAwareTasksPerResourceProfileId: Map[Int, Int], + hostToLocalTaskCountPerResourceProfileId: Map[Int, Map[String, Int]], nodeBlacklist: Set[String]): Boolean = synchronized { - this.numLocalityAwareTasks = localityAwareTasks - this.hostToLocalTaskCounts = hostToLocalTaskCount + this.numLocalityAwareTasksPerResourceProfileId = numLocalityAwareTasksPerResourceProfileId + this.hostToLocalTaskCountPerResourceProfileId = hostToLocalTaskCountPerResourceProfileId - if (requestedTotal != targetNumExecutors) { - logInfo(s"Driver requested a total number of $requestedTotal executor(s).") - targetNumExecutors = requestedTotal - allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist) - true - } else { - false + createYarnResourceForResourceProfile(resourceProfileToTotalExecs) + + val res = resourceProfileToTotalExecs.map { case (rp, numExecs) => + if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) { + logInfo(s"Driver requested a total number of $numExecs executor(s) " + + s"for resource profile id: ${rp.id}.") + targetNumExecutorsPerResourceProfileId(rp.id) = numExecs + allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist) + true + } else { + false + } } + res.exists(_ == true) } /** @@ -237,8 +391,9 @@ private[yarn] class YarnAllocator( def killExecutor(executorId: String): Unit = synchronized { executorIdToContainer.get(executorId) match { case Some(container) if !releasedContainers.contains(container.getId) => + val (_, rpId) = containerIdToExecutorIdAndResourceProfileId(container.getId) internalReleaseContainer(container) - runningExecutors.remove(executorId) + getOrUpdateRunningExecutorForRPId(rpId).remove(executorId) case _ => logWarning(s"Attempted to kill unknown executor $executorId!") } } @@ -267,8 +422,8 @@ private[yarn] class YarnAllocator( "Launching executor count: %d. Cluster resources: %s.") .format( allocatedContainers.size, - runningExecutors.size, - numExecutorsStarting.get, + getNumExecutorsRunning, + getNumExecutorsStarting, allocateResponse.getAvailableResources)) handleAllocatedContainers(allocatedContainers.asScala) @@ -279,108 +434,122 @@ private[yarn] class YarnAllocator( logDebug("Completed %d containers".format(completedContainers.size)) processCompletedContainers(completedContainers.asScala) logDebug("Finished processing %d completed containers. Current running executor count: %d." - .format(completedContainers.size, runningExecutors.size)) + .format(completedContainers.size, getNumExecutorsRunning)) } } /** * Update the set of container requests that we will sync with the RM based on the number of - * executors we have currently running and our target number of executors. + * executors we have currently running and our target number of executors for each + * ResourceProfile. * * Visible for testing. */ - def updateResourceRequests(): Unit = { - val pendingAllocate = getPendingAllocate - val numPendingAllocate = pendingAllocate.size - val missing = targetNumExecutors - numPendingAllocate - - numExecutorsStarting.get - runningExecutors.size - logDebug(s"Updating resource requests, target: $targetNumExecutors, " + - s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " + - s"executorsStarting: ${numExecutorsStarting.get}") + def updateResourceRequests(): Unit = synchronized { + val pendingAllocatePerResourceProfileId = getPendingAllocate + val missingPerProfile = targetNumExecutorsPerResourceProfileId.map { case (rpId, targetNum) => + val starting = getOrUpdateNumExecutorsStartingForRPId(rpId).get + val pending = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty).size + val running = getOrUpdateRunningExecutorForRPId(rpId).size + logDebug(s"Updating resource requests for ResourceProfile id: $rpId, target: " + + s"$targetNum, pending: $pending, running: $running, executorsStarting: $starting") + (rpId, targetNum - pending - running - starting) + }.toMap - // Split the pending container request into three groups: locality matched list, locality - // unmatched list and non-locality list. Take the locality matched container request into - // consideration of container placement, treat as allocated containers. - // For locality unmatched and locality free container requests, cancel these container - // requests, since required locality preference has been changed, recalculating using - // container placement strategy. - val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality( - hostToLocalTaskCounts, pendingAllocate) + missingPerProfile.foreach { case (rpId, missing) => + val hostToLocalTaskCount = + hostToLocalTaskCountPerResourceProfileId.getOrElse(rpId, Map.empty) + val pendingAllocate = pendingAllocatePerResourceProfileId.getOrElse(rpId, Seq.empty) + val numPendingAllocate = pendingAllocate.size + // Split the pending container request into three groups: locality matched list, locality + // unmatched list and non-locality list. Take the locality matched container request into + // consideration of container placement, treat as allocated containers. + // For locality unmatched and locality free container requests, cancel these container + // requests, since required locality preference has been changed, recalculating using + // container placement strategy. + val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality( + hostToLocalTaskCount, pendingAllocate) - if (missing > 0) { - if (log.isInfoEnabled()) { - var requestContainerMessage = s"Will request $missing executor container(s), each with " + + if (missing > 0) { + val resource = rpIdToYarnResource.get(rpId) + if (log.isInfoEnabled()) { + var requestContainerMessage = s"Will request $missing executor container(s) for " + + s" ResourceProfile Id: $rpId, each with " + s"${resource.getVirtualCores} core(s) and " + s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)" - if (ResourceRequestHelper.isYarnResourceTypesAvailable() && - executorResourceRequests.nonEmpty) { - requestContainerMessage ++= s" with custom resources: " + resource.toString + if (ResourceRequestHelper.isYarnResourceTypesAvailable() && + ResourceRequestHelper.isYarnCustomResourcesNonEmpty(resource)) { + requestContainerMessage ++= s" with custom resources: " + resource.toString + } + logInfo(requestContainerMessage) } - logInfo(requestContainerMessage) - } - // cancel "stale" requests for locations that are no longer needed - staleRequests.foreach { stale => - amClient.removeContainerRequest(stale) - } - val cancelledContainers = staleRequests.size - if (cancelledContainers > 0) { - logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)") - } - - // consider the number of new containers and cancelled stale containers available - val availableContainers = missing + cancelledContainers - - // to maximize locality, include requests with no locality preference that can be cancelled - val potentialContainers = availableContainers + anyHostRequests.size - - val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers( - potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts, - allocatedHostToContainersMap, localRequests) - - val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest] - containerLocalityPreferences.foreach { - case ContainerLocalityPreferences(nodes, racks) if nodes != null => - newLocalityRequests += createContainerRequest(resource, nodes, racks) - case _ => - } - - if (availableContainers >= newLocalityRequests.size) { - // more containers are available than needed for locality, fill in requests for any host - for (i <- 0 until (availableContainers - newLocalityRequests.size)) { - newLocalityRequests += createContainerRequest(resource, null, null) + // cancel "stale" requests for locations that are no longer needed + staleRequests.foreach { stale => + amClient.removeContainerRequest(stale) } - } else { - val numToCancel = newLocalityRequests.size - availableContainers - // cancel some requests without locality preferences to schedule more local containers - anyHostRequests.slice(0, numToCancel).foreach { nonLocal => - amClient.removeContainerRequest(nonLocal) + val cancelledContainers = staleRequests.size + if (cancelledContainers > 0) { + logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)") } - if (numToCancel > 0) { - logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality") - } - } - newLocalityRequests.foreach { request => - amClient.addContainerRequest(request) - } + // consider the number of new containers and cancelled stale containers available + val availableContainers = missing + cancelledContainers - if (log.isInfoEnabled()) { - val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null) - if (anyHost.nonEmpty) { - logInfo(s"Submitted ${anyHost.size} unlocalized container requests.") + // to maximize locality, include requests with no locality preference that can be cancelled + val potentialContainers = availableContainers + anyHostRequests.size + + val allocatedHostToContainer = getOrUpdateAllocatedHostToContainersMapForRPId(rpId) + val numLocalityAwareTasks = numLocalityAwareTasksPerResourceProfileId.getOrElse(rpId, 0) + val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers( + potentialContainers, numLocalityAwareTasks, hostToLocalTaskCount, + allocatedHostToContainer, localRequests, rpIdToResourceProfile(rpId)) + + val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest] + containerLocalityPreferences.foreach { + case ContainerLocalityPreferences(nodes, racks) if nodes != null => + newLocalityRequests += createContainerRequest(resource, nodes, racks, rpId) + case _ => } - localized.foreach { request => - logInfo(s"Submitted container request for host ${hostStr(request)}.") + + if (availableContainers >= newLocalityRequests.size) { + // more containers are available than needed for locality, fill in requests for any host + for (i <- 0 until (availableContainers - newLocalityRequests.size)) { + newLocalityRequests += createContainerRequest(resource, null, null, rpId) + } + } else { + val numToCancel = newLocalityRequests.size - availableContainers + // cancel some requests without locality preferences to schedule more local containers + anyHostRequests.slice(0, numToCancel).foreach { nonLocal => + amClient.removeContainerRequest(nonLocal) + } + if (numToCancel > 0) { + logInfo(s"Canceled $numToCancel unlocalized container requests to " + + s"resubmit with locality") + } } + + newLocalityRequests.foreach { request => + amClient.addContainerRequest(request) + } + + if (log.isInfoEnabled()) { + val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null) + if (anyHost.nonEmpty) { + logInfo(s"Submitted ${anyHost.size} unlocalized container requests.") + } + localized.foreach { request => + logInfo(s"Submitted container request for host ${hostStr(request)}.") + } + } + } else if (numPendingAllocate > 0 && missing < 0) { + val numToCancel = math.min(numPendingAllocate, -missing) + logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new " + + s"desired total ${getOrUpdateTargetNumExecutorsForRPId(rpId)} executors.") + // cancel pending allocate requests by taking locality preference into account + val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel) + cancelRequests.foreach(amClient.removeContainerRequest) } - } else if (numPendingAllocate > 0 && missing < 0) { - val numToCancel = math.min(numPendingAllocate, -missing) - logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " + - s"total $targetNumExecutors executors.") - // cancel pending allocate requests by taking locality preference into account - val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel) - cancelRequests.foreach(amClient.removeContainerRequest) } } @@ -405,8 +574,10 @@ private[yarn] class YarnAllocator( private def createContainerRequest( resource: Resource, nodes: Array[String], - racks: Array[String]): ContainerRequest = { - new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull) + racks: Array[String], + rpId: Int): ContainerRequest = { + new ContainerRequest(resource, nodes, racks, getContainerPriority(rpId), + true, labelExpression.orNull) } /** @@ -499,20 +670,17 @@ private[yarn] class YarnAllocator( location: String, containersToUse: ArrayBuffer[Container], remaining: ArrayBuffer[Container]): Unit = { - // SPARK-6050: certain Yarn configurations return a virtual core count that doesn't match the - // request; for example, capacity scheduler + DefaultResourceCalculator. So match on requested - // memory, but use the asked vcore count for matching, effectively disabling matching on vcore - // count. - val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory, - resource.getVirtualCores) - - ResourceRequestHelper.setResourceRequests(executorResourceRequests, matchingResource) + // Match on the exact resource we requested so there shouldn't be a mismatch, + // we are relying on YARN to return a container with resources no less then we requested. + // If we change this, or starting validating the container, be sure the logic covers SPARK-6050. + val rpId = getResourceProfileIdFromPriority(allocatedContainer.getPriority) + val resourceForRP = rpIdToYarnResource.get(rpId) logDebug(s"Calling amClient.getMatchingRequests with parameters: " + s"priority: ${allocatedContainer.getPriority}, " + - s"location: $location, resource: $matchingResource") + s"location: $location, resource: $resourceForRP") val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location, - matchingResource) + resourceForRP) // Match the allocation to a request if (!matchingRequests.isEmpty) { @@ -528,30 +696,38 @@ private[yarn] class YarnAllocator( /** * Launches executors in the allocated containers. */ - private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = { + private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized { for (container <- containersToUse) { + val rpId = getResourceProfileIdFromPriority(container.getPriority) executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId val executorId = executorIdCounter.toString - assert(container.getResource.getMemory >= resource.getMemory) + val yarnResourceForRpId = rpIdToYarnResource.get(rpId) + assert(container.getResource.getMemory >= yarnResourceForRpId.getMemory) logInfo(s"Launching container $containerId on host $executorHostname " + - s"for executor with ID $executorId") + s"for executor with ID $executorId for ResourceProfile Id $rpId") def updateInternalState(): Unit = synchronized { - runningExecutors.add(executorId) - numExecutorsStarting.decrementAndGet() + getOrUpdateRunningExecutorForRPId(rpId).add(executorId) + getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() executorIdToContainer(executorId) = container - containerIdToExecutorId(container.getId) = executorId + containerIdToExecutorIdAndResourceProfileId(container.getId) = (executorId, rpId) - val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, + val localallocatedHostToContainersMap = getOrUpdateAllocatedHostToContainersMapForRPId(rpId) + val containerSet = localallocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]) containerSet += containerId allocatedContainerToHostMap.put(containerId, executorHostname) } - if (runningExecutors.size() < targetNumExecutors) { - numExecutorsStarting.incrementAndGet() + val rp = rpIdToResourceProfile(rpId) + val containerMem = rp.executorResources.get(ResourceProfile.MEMORY). + map(_.amount.toInt).getOrElse(executorMemory) + val containerCores = rp.getExecutorCores.getOrElse(defaultExecutorCores) + val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size + if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) { + getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet() if (launchContainers) { launcherPool.execute(() => { try { @@ -562,17 +738,17 @@ private[yarn] class YarnAllocator( driverUrl, executorId, executorHostname, - executorMemory, - executorCores, + containerMem, + containerCores, appAttemptId.getApplicationId.toString, securityMgr, localResources, - ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported + rp.id ).run() updateInternalState() } catch { case e: Throwable => - numExecutorsStarting.decrementAndGet() + getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() if (NonFatal(e)) { logError(s"Failed to launch executor $executorId on container $containerId", e) // Assigned container should be released immediately @@ -589,24 +765,28 @@ private[yarn] class YarnAllocator( } } else { logInfo(("Skip launching executorRunnable as running executors count: %d " + - "reached target executors count: %d.").format( - runningExecutors.size, targetNumExecutors)) + "reached target executors count: %d.").format(rpRunningExecs, + getOrUpdateTargetNumExecutorsForRPId(rpId))) } } } // Visible for testing. - private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = { + private[yarn] def processCompletedContainers( + completedContainers: Seq[ContainerStatus]): Unit = synchronized { for (completedContainer <- completedContainers) { val containerId = completedContainer.getContainerId + val (_, rpId) = containerIdToExecutorIdAndResourceProfileId.getOrElse(containerId, + ("", DEFAULT_RESOURCE_PROFILE_ID)) val alreadyReleased = releasedContainers.remove(containerId) val hostOpt = allocatedContainerToHostMap.get(containerId) val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("") val exitReason = if (!alreadyReleased) { // Decrement the number of executors running. The next iteration of // the ApplicationMaster's reporting thread will take care of allocating. - containerIdToExecutorId.get(containerId) match { - case Some(executorId) => runningExecutors.remove(executorId) + containerIdToExecutorIdAndResourceProfileId.get(containerId) match { + case Some((executorId, _)) => + getOrUpdateRunningExecutorForRPId(rpId).remove(executorId) case None => logWarning(s"Cannot find executorId for container: ${containerId.toString}") } @@ -679,19 +859,19 @@ private[yarn] class YarnAllocator( for { host <- hostOpt - containerSet <- allocatedHostToContainersMap.get(host) + containerSet <- getOrUpdateAllocatedHostToContainersMapForRPId(rpId).get(host) } { containerSet.remove(containerId) if (containerSet.isEmpty) { - allocatedHostToContainersMap.remove(host) + getOrUpdateAllocatedHostToContainersMapForRPId(rpId).remove(host) } else { - allocatedHostToContainersMap.update(host, containerSet) + getOrUpdateAllocatedHostToContainersMapForRPId(rpId).update(host, containerSet) } allocatedContainerToHostMap.remove(containerId) } - containerIdToExecutorId.remove(containerId).foreach { eid => + containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) => executorIdToContainer.remove(eid) pendingLossReasonRequests.remove(eid) match { case Some(pendingRequests) => @@ -737,12 +917,14 @@ private[yarn] class YarnAllocator( } } - private def internalReleaseContainer(container: Container): Unit = { + private def internalReleaseContainer(container: Container): Unit = synchronized { releasedContainers.add(container.getId()) amClient.releaseAssignedContainer(container.getId()) } - private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease + private[yarn] def getNumUnexpectedContainerRelease: Long = synchronized { + numUnexpectedContainerRelease + } private[yarn] def getNumPendingLossReasonRequests: Int = synchronized { pendingLossReasonRequests.size diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index f8bbc39c8b..e428bab4f9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -130,12 +130,8 @@ private[spark] abstract class YarnSchedulerBackend( val filteredRPHostToLocalTaskCount = rpHostToLocalTaskCount.map { case (rpid, v) => (rpid, v.filter { case (host, count) => !nodeBlacklist.contains(host) }) } - // TODO - default everything to default profile until YARN pieces - val defaultProf = ResourceProfile.getOrCreateDefaultProfile(conf) - val hostToLocalTaskCount = filteredRPHostToLocalTaskCount.getOrElse(defaultProf.id, Map.empty) - val localityAwareTasks = numLocalityAwareTasksPerResourceProfileId.getOrElse(defaultProf.id, 0) - val numExecutors = resourceProfileToTotalExecs.getOrElse(defaultProf, 0) - RequestExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount, nodeBlacklist) + RequestExecutors(resourceProfileToTotalExecs, numLocalityAwareTasksPerResourceProfileId, + filteredRPHostToLocalTaskCount, nodeBlacklist) } /** diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala index 29f1c0512f..d83a0d2efe 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.deploy.yarn +import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.scalatest.{BeforeAndAfterEach, Matchers} +import org.apache.spark.SparkConf import org.apache.spark.SparkFunSuite +import org.apache.spark.resource.ResourceProfile class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { @@ -28,7 +31,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B import yarnAllocatorSuite._ def createContainerRequest(nodes: Array[String]): ContainerRequest = - new ContainerRequest(containerResource, nodes, null, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) + new ContainerRequest(containerResource, nodes, null, Priority.newInstance(1)) override def beforeEach(): Unit = { yarnAllocatorSuite.beforeEach() @@ -38,18 +41,22 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B yarnAllocatorSuite.afterEach() } + val defaultResourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID + test("allocate locality preferred containers with enough resource and no matched existed " + "containers") { // 1. All the locations of current containers cannot satisfy the new requirements // 2. Current requested container number can fully satisfy the pending tasks. - val handler = createAllocator(2) + val (handler, allocatorConf) = createAllocator(2) handler.updateResourceRequests() handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2"))) + ResourceProfile.clearDefaultProfile + val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf) val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), - handler.allocatedHostToContainersMap, Seq.empty) + handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp) assert(localities.map(_.nodes) === Array( Array("host3", "host4", "host5"), @@ -62,7 +69,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B // 1. Parts of current containers' locations can satisfy the new requirements // 2. Current requested container number can fully satisfy the pending tasks. - val handler = createAllocator(3) + val (handler, allocatorConf) = createAllocator(3) handler.updateResourceRequests() handler.handleAllocatedContainers(Array( createContainer("host1"), @@ -70,9 +77,12 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B createContainer("host2") )) + ResourceProfile.clearDefaultProfile + val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf) + val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), - handler.allocatedHostToContainersMap, Seq.empty) + handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp) assert(localities.map(_.nodes) === Array(null, Array("host2", "host3"), Array("host2", "host3"))) @@ -83,7 +93,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B // 1. Parts of current containers' locations can satisfy the new requirements // 2. Current requested container number cannot fully satisfy the pending tasks. - val handler = createAllocator(3) + val (handler, allocatorConf) = createAllocator(3) handler.updateResourceRequests() handler.handleAllocatedContainers(Array( createContainer("host1"), @@ -91,9 +101,11 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B createContainer("host2") )) + ResourceProfile.clearDefaultProfile + val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf) val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), - handler.allocatedHostToContainersMap, Seq.empty) + handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp) assert(localities.map(_.nodes) === Array(Array("host2", "host3"))) } @@ -101,7 +113,7 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B test("allocate locality preferred containers with fully matched containers") { // Current containers' locations can fully satisfy the new requirements - val handler = createAllocator(5) + val (handler, allocatorConf) = createAllocator(5) handler.updateResourceRequests() handler.handleAllocatedContainers(Array( createContainer("host1"), @@ -111,9 +123,11 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B createContainer("host3") )) + ResourceProfile.clearDefaultProfile + val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf) val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), - handler.allocatedHostToContainersMap, Seq.empty) + handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp) assert(localities.map(_.nodes) === Array(null, null, null)) } @@ -121,18 +135,21 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B test("allocate containers with no locality preference") { // Request new container without locality preference - val handler = createAllocator(2) + val (handler, allocatorConf) = createAllocator(2) handler.updateResourceRequests() handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2"))) + ResourceProfile.clearDefaultProfile + val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf) val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 1, 0, Map.empty, handler.allocatedHostToContainersMap, Seq.empty) + 1, 0, Map.empty, + handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), Seq.empty, rp) assert(localities.map(_.nodes) === Array(null)) } test("allocate locality preferred containers by considering the localities of pending requests") { - val handler = createAllocator(3) + val (handler, allocatorConf) = createAllocator(3) handler.updateResourceRequests() handler.handleAllocatedContainers(Array( createContainer("host1"), @@ -144,9 +161,12 @@ class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with B createContainerRequest(Array("host2", "host3")), createContainerRequest(Array("host1", "host4"))) + ResourceProfile.clearDefaultProfile + val rp = ResourceProfile.getOrCreateDefaultProfile(allocatorConf) val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), - handler.allocatedHostToContainersMap, pendingAllocationRequests) + handler.allocatedHostToContainersMapPerRPId(defaultResourceProfileId), + pendingAllocationRequests, rp) assert(localities.map(_.nodes) === Array(Array("host3"))) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala index b7f25656e4..727851747e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.mockito.Mockito._ import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.resource.ResourceProfile class LocalityPlacementStrategySuite extends SparkFunSuite { @@ -58,7 +59,7 @@ class LocalityPlacementStrategySuite extends SparkFunSuite { val resource = Resource.newInstance(8 * 1024, 4) val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(), - yarnConf, resource, new MockResolver()) + yarnConf, new MockResolver()) val totalTasks = 32 * 1024 val totalContainers = totalTasks / 16 @@ -75,9 +76,10 @@ class LocalityPlacementStrategySuite extends SparkFunSuite { containers.drop(count * i).take(i).foreach { c => hostContainers += c } hostToContainerMap(host) = hostContainers } + val rp = ResourceProfile.getOrCreateDefaultProfile(new SparkConf) strategy.localityOfRequestedContainers(containers.size * 2, totalTasks, hosts, - hostToContainerMap, Nil) + hostToContainerMap, Nil, rp) } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 6216d47388..2003d0bb87 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn import java.util.Collections import scala.collection.JavaConverters._ +import scala.collection.mutable import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient @@ -32,9 +33,9 @@ import org.scalatest.{BeforeAndAfterEach, Matchers} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.ResourceRequestHelper._ -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config._ +import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests} import org.apache.spark.resource.ResourceUtils.{AMOUNT, GPU} import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.rpc.RpcEndpointRef @@ -69,6 +70,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter var containerNum = 0 + // priority has to be 0 to match default profile id + val RM_REQUEST_PRIORITY = Priority.newInstance(0) + val defaultRPId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID + val defaultRP = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + override def beforeEach(): Unit = { super.beforeEach() rmClient = AMRMClient.createAMRMClient() @@ -93,7 +99,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter def createAllocator( maxExecutors: Int = 5, rmClient: AMRMClient[ContainerRequest] = rmClient, - additionalConfigs: Map[String, String] = Map()): YarnAllocator = { + additionalConfigs: Map[String, String] = Map()): (YarnAllocator, SparkConf) = { val args = Array( "--jar", "somejar.jar", "--class", "SomeClass") @@ -107,7 +113,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter sparkConfClone.set(name, value) } - new YarnAllocator( + val allocator = new YarnAllocator( "not used", mock(classOf[RpcEndpointRef]), conf, @@ -118,16 +124,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter Map(), new MockResolver(), clock) + (allocator, sparkConfClone) } def createContainer( host: String, containerNumber: Int = containerNum, - resource: Resource = containerResource): Container = { + resource: Resource = containerResource, + priority: Priority = RM_REQUEST_PRIORITY): Container = { val containerId: ContainerId = ContainerId.newContainerId(appAttemptId, containerNum) containerNum += 1 val nodeId = NodeId.newInstance(host, 1000) - Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null) + Container.newInstance(containerId, nodeId, "", resource, priority, null) } def createContainers(hosts: Seq[String], containerIds: Seq[Int]): Seq[Container] = { @@ -145,37 +153,125 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter test("single container allocated") { // request a single container and receive it - val handler = createAllocator(1) + val (handler, _) = createAllocator(1) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (1) + handler.getNumContainersPendingAllocate should be (1) val container = createContainer("host1") handler.handleAllocatedContainers(Array(container)) handler.getNumExecutorsRunning should be (1) handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") - handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) + val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId) + hostTocontainer.get("host1").get should contain(container.getId) val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size size should be (0) } + test("single container allocated with ResourceProfile") { + assume(isYarnResourceTypesAvailable()) + val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG) + ResourceRequestTestHelper.initializeResourceTypes(yarnResources) + // create default profile so we get a different id to test below + val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val execReq = new ExecutorResourceRequests().resource("gpu", 6) + val taskReq = new TaskResourceRequests().resource("gpu", 1) + val rprof = new ResourceProfile(execReq.requests, taskReq.requests) + // request a single container and receive it + val (handler, _) = createAllocator(0) + + val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof -> 1) + val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id -> 0) + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) + + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getNumContainersPendingAllocate should be (1) + + val container = createContainer("host1", priority = Priority.newInstance(rprof.id)) + handler.handleAllocatedContainers(Array(container)) + + handler.getNumExecutorsRunning should be (1) + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id) + hostTocontainer.get("host1").get should contain(container.getId) + + val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size + size should be (0) + + ResourceProfile.reInitDefaultProfile(sparkConf) + } + + test("multiple containers allocated with ResourceProfiles") { + assume(isYarnResourceTypesAvailable()) + val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG) + ResourceRequestTestHelper.initializeResourceTypes(yarnResources) + // create default profile so we get a different id to test below + val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val execReq = new ExecutorResourceRequests().resource("gpu", 6) + val taskReq = new TaskResourceRequests().resource("gpu", 1) + val rprof = new ResourceProfile(execReq.requests, taskReq.requests) + + val execReq2 = new ExecutorResourceRequests().memory("8g").resource("fpga", 2) + val taskReq2 = new TaskResourceRequests().resource("fpga", 1) + val rprof2 = new ResourceProfile(execReq2.requests, taskReq2.requests) + + + // request a single container and receive it + val (handler, _) = createAllocator(1) + val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof -> 1, rprof2 -> 2) + val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id -> 0, rprof2.id -> 0) + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) + + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getNumContainersPendingAllocate should be (3) + + val containerResourcerp2 = Resource.newInstance(10240, 5) + + val container = createContainer("host1", priority = Priority.newInstance(rprof.id)) + val container2 = createContainer("host2", resource = containerResourcerp2, + priority = Priority.newInstance(rprof2.id)) + val container3 = createContainer("host3", resource = containerResourcerp2, + priority = Priority.newInstance(rprof2.id)) + handler.handleAllocatedContainers(Array(container, container2, container3)) + + handler.getNumExecutorsRunning should be (3) + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2") + handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host3") + + val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id) + hostTocontainer.get("host1").get should contain(container.getId) + val hostTocontainer2 = handler.allocatedHostToContainersMapPerRPId(rprof2.id) + hostTocontainer2.get("host2").get should contain(container2.getId) + hostTocontainer2.get("host3").get should contain(container3.getId) + + val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size + size should be (0) + + ResourceProfile.reInitDefaultProfile(sparkConf) + } + test("custom resource requested from yarn") { assume(isYarnResourceTypesAvailable()) ResourceRequestTestHelper.initializeResourceTypes(List("gpu")) val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) - val handler = createAllocator(1, mockAmClient, + val (handler, _) = createAllocator(1, mockAmClient, Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G")) handler.updateResourceRequests() - val container = createContainer("host1", resource = handler.resource) + val container = createContainer("host1", resource = handler.defaultResource) handler.handleAllocatedContainers(Array(container)) // get amount of memory and vcores from resource, so effectively skipping their validation - val expectedResources = Resource.newInstance(handler.resource.getMemory(), - handler.resource.getVirtualCores) + val expectedResources = Resource.newInstance(handler.defaultResource.getMemory(), + handler.defaultResource.getVirtualCores) setResourceRequests(Map("gpu" -> "2G"), expectedResources) val captor = ArgumentCaptor.forClass(classOf[ContainerRequest]) @@ -195,10 +291,10 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter Map(EXECUTOR_GPU_ID.amountConf -> "3", EXECUTOR_FPGA_ID.amountConf -> "2", madeupConfigName -> "5") - val handler = createAllocator(1, mockAmClient, sparkResources) + val (handler, _) = createAllocator(1, mockAmClient, sparkResources) handler.updateResourceRequests() - val yarnRInfo = ResourceRequestTestHelper.getResources(handler.resource) + val yarnRInfo = ResourceRequestTestHelper.getResources(handler.defaultResource) val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) ).toMap assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty) assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3) @@ -210,17 +306,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter test("container should not be created if requested number if met") { // request a single container and receive it - val handler = createAllocator(1) + val (handler, _) = createAllocator(1) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (1) + handler.getNumContainersPendingAllocate should be (1) val container = createContainer("host1") handler.handleAllocatedContainers(Array(container)) handler.getNumExecutorsRunning should be (1) handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") - handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) + val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId) + hostTocontainer.get("host1").get should contain(container.getId) val container2 = createContainer("host2") handler.handleAllocatedContainers(Array(container2)) @@ -229,10 +326,10 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter test("some containers allocated") { // request a few containers and receive some of them - val handler = createAllocator(4) + val (handler, _) = createAllocator(4) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) + handler.getNumContainersPendingAllocate should be (4) val container1 = createContainer("host1") val container2 = createContainer("host1") @@ -243,16 +340,17 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1") handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host1") handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host2") - handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId) - handler.allocatedHostToContainersMap.get("host1").get should contain (container2.getId) - handler.allocatedHostToContainersMap.get("host2").get should contain (container3.getId) + val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId) + hostTocontainer.get("host1").get should contain(container1.getId) + hostTocontainer.get("host1").get should contain (container2.getId) + hostTocontainer.get("host2").get should contain (container3.getId) } test("receive more containers than requested") { - val handler = createAllocator(2) + val (handler, _) = createAllocator(2) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (2) + handler.getNumContainersPendingAllocate should be (2) val container1 = createContainer("host1") val container2 = createContainer("host2") @@ -263,42 +361,52 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1") handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2") handler.allocatedContainerToHostMap.contains(container3.getId) should be (false) - handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId) - handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId) - handler.allocatedHostToContainersMap.contains("host4") should be (false) + val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId) + hostTocontainer.get("host1").get should contain(container1.getId) + hostTocontainer.get("host2").get should contain (container2.getId) + hostTocontainer.contains("host4") should be (false) } test("decrease total requested executors") { - val handler = createAllocator(4) + val (handler, _) = createAllocator(4) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) + handler.getNumContainersPendingAllocate should be (4) - handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty) + val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 3) + val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0) + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) handler.updateResourceRequests() - handler.getPendingAllocate.size should be (3) + handler.getNumContainersPendingAllocate should be (3) val container = createContainer("host1") handler.handleAllocatedContainers(Array(container)) handler.getNumExecutorsRunning should be (1) handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") - handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) + val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId) + hostTocontainer.get("host1").get should contain(container.getId) - handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty) + resourceProfileToTotalExecs(defaultRP) = 2 + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) handler.updateResourceRequests() - handler.getPendingAllocate.size should be (1) + handler.getNumContainersPendingAllocate should be (1) } test("decrease total requested executors to less than currently running") { - val handler = createAllocator(4) + val (handler, _) = createAllocator(4) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) + handler.getNumContainersPendingAllocate should be (4) - handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty) + val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 3) + val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0) + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) handler.updateResourceRequests() - handler.getPendingAllocate.size should be (3) + handler.getNumContainersPendingAllocate should be (3) val container1 = createContainer("host1") val container2 = createContainer("host2") @@ -306,23 +414,28 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.getNumExecutorsRunning should be (2) - handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty) + resourceProfileToTotalExecs(defaultRP) = 1 + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) handler.updateResourceRequests() - handler.getPendingAllocate.size should be (0) + handler.getNumContainersPendingAllocate should be (0) handler.getNumExecutorsRunning should be (2) } test("kill executors") { - val handler = createAllocator(4) + val (handler, _) = createAllocator(4) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) + handler.getNumContainersPendingAllocate should be (4) val container1 = createContainer("host1") val container2 = createContainer("host2") handler.handleAllocatedContainers(Array(container1, container2)) - handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty) + val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 1) + val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0) + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) } val statuses = Seq(container1, container2).map { c => @@ -331,20 +444,20 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.updateResourceRequests() handler.processCompletedContainers(statuses) handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (1) + handler.getNumContainersPendingAllocate should be (1) } test("kill same executor multiple times") { - val handler = createAllocator(2) + val (handler, _) = createAllocator(2) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (2) + handler.getNumContainersPendingAllocate should be (2) val container1 = createContainer("host1") val container2 = createContainer("host2") handler.handleAllocatedContainers(Array(container1, container2)) handler.getNumExecutorsRunning should be (2) - handler.getPendingAllocate.size should be (0) + handler.getNumContainersPendingAllocate should be (0) val executorToKill = handler.executorIdToContainer.keys.head handler.killExecutor(executorToKill) @@ -353,22 +466,25 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.killExecutor(executorToKill) handler.killExecutor(executorToKill) handler.getNumExecutorsRunning should be (1) - handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty) + val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 2) + val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0) + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) handler.updateResourceRequests() - handler.getPendingAllocate.size should be (1) + handler.getNumContainersPendingAllocate should be (1) } test("process same completed container multiple times") { - val handler = createAllocator(2) + val (handler, _) = createAllocator(2) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (2) + handler.getNumContainersPendingAllocate should be (2) val container1 = createContainer("host1") val container2 = createContainer("host2") handler.handleAllocatedContainers(Array(container1, container2)) handler.getNumExecutorsRunning should be (2) - handler.getPendingAllocate.size should be (0) + handler.getNumContainersPendingAllocate should be (0) val statuses = Seq(container1, container1, container2).map { c => ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0) @@ -379,16 +495,19 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter } test("lost executor removed from backend") { - val handler = createAllocator(4) + val (handler, _) = createAllocator(4) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) + handler.getNumContainersPendingAllocate should be (4) val container1 = createContainer("host1") val container2 = createContainer("host2") handler.handleAllocatedContainers(Array(container1, container2)) - handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Set.empty) + val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 2) + val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0) + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set.empty) val statuses = Seq(container1, container2).map { c => ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1) @@ -397,7 +516,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter handler.processCompletedContainers(statuses) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (2) + handler.getNumContainersPendingAllocate should be (2) handler.getNumExecutorsFailed should be (2) handler.getNumUnexpectedContainerRelease should be (2) } @@ -406,28 +525,35 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter // Internally we track the set of blacklisted nodes, but yarn wants us to send *changes* // to the blacklist. This makes sure we are sending the right updates. val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) - val handler = createAllocator(4, mockAmClient) - handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set("hostA")) + val (handler, _) = createAllocator(4, mockAmClient) + val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 1) + val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0) + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set("hostA")) verify(mockAmClient).updateBlacklist(Seq("hostA").asJava, Seq[String]().asJava) val blacklistedNodes = Set( "hostA", "hostB" ) - handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), blacklistedNodes) - verify(mockAmClient).updateBlacklist(Seq("hostB").asJava, Seq[String]().asJava) - handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set.empty) + resourceProfileToTotalExecs(defaultRP) = 2 + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map(), blacklistedNodes) + verify(mockAmClient).updateBlacklist(Seq("hostB").asJava, Seq[String]().asJava) + resourceProfileToTotalExecs(defaultRP) = 3 + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set.empty) verify(mockAmClient).updateBlacklist(Seq[String]().asJava, Seq("hostA", "hostB").asJava) } test("window based failure executor counting") { sparkConf.set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS, 100 * 1000L) - val handler = createAllocator(4) + val (handler, _) = createAllocator(4) handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) + handler.getNumContainersPendingAllocate should be (4) val containers = Seq( createContainer("host1"), @@ -468,7 +594,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val rmClientSpy = spy(rmClient) val maxExecutors = 11 - val handler = createAllocator( + val (handler, _) = createAllocator( maxExecutors, rmClientSpy, Map( @@ -525,9 +651,9 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter try { sparkConf.set(MEMORY_OFFHEAP_ENABLED, true) sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte) - val allocator = createAllocator(maxExecutors = 1, + val (handler, _) = createAllocator(maxExecutors = 1, additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString)) - val memory = allocator.resource.getMemory + val memory = handler.defaultResource.getMemory assert(memory == executorMemory + offHeapMemoryInMB + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN) } finally { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala index c0c6fff513..9003c2f630 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala @@ -51,9 +51,8 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc private class TestYarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext) extends YarnSchedulerBackend(scheduler, sc) { - def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = { - this.rpHostToLocalTaskCount = Map(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -> - hostToLocalTaskCount) + def setHostToLocalTaskCount(hostToLocalTaskCount: Map[Int, Map[String, Int]]): Unit = { + this.rpHostToLocalTaskCount = hostToLocalTaskCount } } @@ -64,21 +63,24 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc val yarnSchedulerBackendExtended = new TestYarnSchedulerBackend(sched, sc) yarnSchedulerBackend = yarnSchedulerBackendExtended val ser = new JavaSerializer(sc.conf).newInstance() + val defaultResourceProf = ResourceProfile.getOrCreateDefaultProfile(sc.getConf) for { blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c")) numRequested <- 0 until 10 hostToLocalCount <- IndexedSeq( - Map[String, Int](), - Map("a" -> 1, "b" -> 2) + Map(defaultResourceProf.id -> Map.empty[String, Int]), + Map(defaultResourceProf.id -> Map("a" -> 1, "b" -> 2)) ) } { yarnSchedulerBackendExtended.setHostToLocalTaskCount(hostToLocalCount) sched.setNodeBlacklist(blacklist) - val numReq = Map(ResourceProfile.getOrCreateDefaultProfile(sc.getConf) -> numRequested) - val req = yarnSchedulerBackendExtended.prepareRequestExecutors(numReq) - assert(req.requestedTotal === numRequested) + val request = Map(defaultResourceProf -> numRequested) + val req = yarnSchedulerBackendExtended.prepareRequestExecutors(request) + assert(req.resourceProfileToTotalExecs(defaultResourceProf) === numRequested) assert(req.nodeBlacklist === blacklist) - assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty) + val hosts = + req.hostToLocalTaskCount(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID).keySet + assert(hosts.intersect(blacklist).isEmpty) // Serialize to make sure serialization doesn't throw an error ser.serialize(req) }