[SPARK-26269][YARN] Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource

## What changes were proposed in this pull request?

As I mentioned in jira [SPARK-26269](https://issues.apache.org/jira/browse/SPARK-26269), in order to maxmize the use of cluster resource,  this pr try to make `YarnAllocator` have the same blacklist behaviour with YARN.

## How was this patch tested?

Added.

Closes #23223 from Ngone51/dev-YarnAllocator-should-have-same-blacklist-behaviour-with-YARN.

Lead-authored-by: wuyi <ngone_5451@163.com>
Co-authored-by: Ngone51 <ngone_5451@163.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
wuyi 2018-12-21 13:21:58 -06:00 committed by Thomas Graves
parent 8e76d6621a
commit d6a5f85984
4 changed files with 101 additions and 12 deletions

View file

@ -607,13 +607,23 @@ private[yarn] class YarnAllocator(
val message = "Container killed by YARN for exceeding physical memory limits. " +
s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
(true, message)
case _ =>
// all the failures which not covered above, like:
// disk failure, kill by app master or resource manager, ...
case other_exit_status =>
// SPARK-26269: follow YARN's blacklisting behaviour(see https://github
// .com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/had
// oop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap
// ache/hadoop/yarn/util/Apps.java#L273 for details)
if (NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) {
(false, s"Container marked as failed: $containerId$onHostStr" +
s". Exit status: ${completedContainer.getExitStatus}" +
s". Diagnostics: ${completedContainer.getDiagnostics}.")
} else {
// completed container from a bad node
allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
(true, s"Container from a bad node: $containerId$onHostStr" +
s". Exit status: ${completedContainer.getExitStatus}" +
s". Diagnostics: ${completedContainer.getDiagnostics}.")
}
}
if (exitCausedByApp) {
@ -739,4 +749,12 @@ private object YarnAllocator {
val MEM_REGEX = "[0-9.]+ [KMG]B"
val VMEM_EXCEEDED_EXIT_CODE = -103
val PMEM_EXCEEDED_EXIT_CODE = -104
val NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS = Set(
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
ContainerExitStatus.KILLED_BY_APPMASTER,
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
ContainerExitStatus.ABORTED,
ContainerExitStatus.DISKS_FAILED
)
}

View file

@ -120,7 +120,9 @@ private[spark] class YarnAllocatorBlacklistTracker(
if (removals.nonEmpty) {
logInfo(s"removing nodes from YARN application master's blacklist: $removals")
}
if (additions.nonEmpty || removals.nonEmpty) {
amClient.updateBlacklist(additions.asJava, removals.asJava)
}
currentBlacklistedYarnNodes = nodesToBlacklist
}

View file

@ -87,7 +87,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers
// expired blacklisted nodes (simulating a resource request)
yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
// no change is communicated to YARN regarding the blacklisting
verify(amClientMock).updateBlacklist(Collections.emptyList(), Collections.emptyList())
verify(amClientMock, times(0)).updateBlacklist(Collections.emptyList(), Collections.emptyList())
}
test("combining scheduler and allocation blacklist") {

View file

@ -17,6 +17,8 @@
package org.apache.spark.deploy.yarn
import java.util.Collections
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
@ -114,13 +116,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
clock)
}
def createContainer(host: String, resource: Resource = containerResource): Container = {
val containerId = ContainerId.newContainerId(appAttemptId, containerNum)
def createContainer(
host: String,
containerNumber: Int = containerNum,
resource: Resource = containerResource): Container = {
val containerId: ContainerId = ContainerId.newContainerId(appAttemptId, containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)
Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null)
}
def createContainers(hosts: Seq[String], containerIds: Seq[Int]): Seq[Container] = {
hosts.zip(containerIds).map{case (host, id) => createContainer(host, id)}
}
def createContainerStatus(
containerId: ContainerId,
exitStatus: Int,
containerState: ContainerState = ContainerState.COMPLETE,
diagnostics: String = "diagnostics"): ContainerStatus = {
ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus)
}
test("single container allocated") {
// request a single container and receive it
val handler = createAllocator(1)
@ -148,7 +166,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
Map(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G"))
handler.updateResourceRequests()
val container = createContainer("host1", handler.resource)
val container = createContainer("host1", resource = handler.resource)
handler.handleAllocatedContainers(Array(container))
// get amount of memory and vcores from resource, so effectively skipping their validation
@ -417,4 +435,55 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
clock.advance(50 * 1000L)
handler.getNumExecutorsFailed should be (0)
}
test("SPARK-26269: YarnAllocator should have same blacklist behaviour with YARN") {
val rmClientSpy = spy(rmClient)
val maxExecutors = 11
val handler = createAllocator(
maxExecutors,
rmClientSpy,
Map(
"spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> "true",
"spark.blacklist.application.maxFailedExecutorsPerNode" -> "0"))
handler.updateResourceRequests()
val hosts = (0 until maxExecutors).map(i => s"host$i")
val ids = 0 to maxExecutors
val containers = createContainers(hosts, ids)
val nonBlacklistedStatuses = Seq(
ContainerExitStatus.SUCCESS,
ContainerExitStatus.PREEMPTED,
ContainerExitStatus.KILLED_EXCEEDED_VMEM,
ContainerExitStatus.KILLED_EXCEEDED_PMEM,
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
ContainerExitStatus.KILLED_BY_APPMASTER,
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
ContainerExitStatus.ABORTED,
ContainerExitStatus.DISKS_FAILED)
val nonBlacklistedContainerStatuses = nonBlacklistedStatuses.zipWithIndex.map {
case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
}
val BLACKLISTED_EXIT_CODE = 1
val blacklistedStatuses = Seq(ContainerExitStatus.INVALID, BLACKLISTED_EXIT_CODE)
val blacklistedContainerStatuses = blacklistedStatuses.zip(9 until maxExecutors).map {
case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
}
handler.handleAllocatedContainers(containers.slice(0, 9))
handler.processCompletedContainers(nonBlacklistedContainerStatuses)
verify(rmClientSpy, never())
.updateBlacklist(hosts.slice(0, 9).asJava, Collections.emptyList())
handler.handleAllocatedContainers(containers.slice(9, 11))
handler.processCompletedContainers(blacklistedContainerStatuses)
verify(rmClientSpy)
.updateBlacklist(hosts.slice(9, 10).asJava, Collections.emptyList())
verify(rmClientSpy)
.updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
}
}