[SPARK-20327][YARN] Follow up: fix resource request tests on Hadoop 3.

The test fix is to allocate a `Resource` object only after the resource
types have been initialized. Otherwise the YARN classes get in a weird
state and throw a different exception than expected, because the resource
has a different view of the registered resources.

I also removed a test for a null resource since that seems unnecessary
and made the fix more complicated.

All the other changes are just cleanup; basically simplify the tests by
defining what is being tested and deriving the resource type registration
and the SparkConf from that data, instead of having redundant definitions
in the tests.

Ran tests with Hadoop 3 (and also without it).

Closes #22751 from vanzin/SPARK-20327.fix.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
This commit is contained in:
Marcelo Vanzin 2018-10-17 10:40:47 -05:00 committed by Imran Rashid
parent 24f5bbd770
commit 7d425b190a
4 changed files with 93 additions and 181 deletions

View file

@ -29,7 +29,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
import org.apache.spark.util.{CausedBy, Utils}
/**
* This helper class uses some of Hadoop 3 methods from the YARN API,
@ -121,6 +121,8 @@ private object ResourceRequestHelper extends Logging {
case _: MatchError =>
throw new IllegalArgumentException(s"Resource request for '$name' ('$rawAmount') " +
s"does not match pattern $AMOUNT_AND_UNIT_REGEX.")
case CausedBy(e: IllegalArgumentException) =>
throw new IllegalArgumentException(s"Invalid request for $name: ${e.getMessage}")
case e: InvocationTargetException if e.getCause != null => throw e.getCause
}
}

View file

@ -200,20 +200,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
appContext.getMaxAppAttempts should be (42)
}
test("resource request (client mode)") {
val sparkConf = new SparkConf().set("spark.submit.deployMode", "client")
.set(YARN_AM_RESOURCE_TYPES_PREFIX + "fpga", "2")
.set(YARN_AM_RESOURCE_TYPES_PREFIX + "gpu", "3")
testResourceRequest(sparkConf, List("gpu", "fpga"), Seq(("fpga", 2), ("gpu", 3)))
}
test("resource request (cluster mode)") {
val sparkConf = new SparkConf().set("spark.submit.deployMode", "cluster")
.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "fpga", "4")
.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "gpu", "5")
testResourceRequest(sparkConf, List("gpu", "fpga"), Seq(("fpga", 4), ("gpu", 5)))
}
test("spark.yarn.jars with multiple paths and globs") {
val libs = Utils.createTempDir()
val single = Utils.createTempDir()
@ -372,6 +358,35 @@ class ClientSuite extends SparkFunSuite with Matchers {
sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName)))
}
Seq(
"client" -> YARN_AM_RESOURCE_TYPES_PREFIX,
"cluster" -> YARN_DRIVER_RESOURCE_TYPES_PREFIX
).foreach { case (deployMode, prefix) =>
test(s"custom resource request ($deployMode mode)") {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val resources = Map("fpga" -> 2, "gpu" -> 3)
ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
val conf = new SparkConf().set("spark.submit.deployMode", deployMode)
resources.foreach { case (name, v) =>
conf.set(prefix + name, v.toString)
}
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
val client = new Client(new ClientArguments(Array()), conf)
client.createApplicationSubmissionContext(
new YarnClientApplication(getNewApplicationResponse, appContext),
containerLaunchContext)
resources.foreach { case (name, value) =>
ResourceRequestTestHelper.getRequestedValue(appContext.getResource, name) should be (value)
}
}
}
private val matching = Seq(
("files URI match test1", "file:///file1", "file:///file2"),
("files URI match test2", "file:///c:file1", "file://c:file2"),
@ -447,31 +462,4 @@ class ClientSuite extends SparkFunSuite with Matchers {
populateClasspath(null, new Configuration(), client.sparkConf, env)
classpath(env)
}
private def testResourceRequest(
sparkConf: SparkConf,
resources: List[String],
expectedResources: Seq[(String, Long)]): Unit = {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
ResourceRequestTestHelper.initializeResourceTypes(resources)
val args = new ClientArguments(Array())
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
val client = new Client(args, sparkConf)
client.createApplicationSubmissionContext(
new YarnClientApplication(getNewApplicationResponse, appContext),
containerLaunchContext)
appContext.getAMContainerSpec should be (containerLaunchContext)
appContext.getApplicationType should be ("SPARK")
expectedResources.foreach { case (name, value) =>
ResourceRequestTestHelper.getResourceTypeValue(appContext.getResource, name) should be (value)
}
}
}

View file

@ -39,42 +39,6 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX + MEMORY
private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX + CORES
test("resource request value does not match pattern") {
verifySetResourceRequestsException(List(CUSTOM_RES_1),
Map(CUSTOM_RES_1 -> "**@#"), CUSTOM_RES_1)
}
test("resource request just unit defined") {
verifySetResourceRequestsException(List(), Map(CUSTOM_RES_1 -> "m"), CUSTOM_RES_1)
}
test("resource request with null value should not be allowed") {
verifySetResourceRequestsException(List(), null, Map(CUSTOM_RES_1 -> "123"),
"requirement failed: Resource parameter should not be null!")
}
test("resource request with valid value and invalid unit") {
verifySetResourceRequestsException(List(CUSTOM_RES_1), createResource,
Map(CUSTOM_RES_1 -> "123ppp"), "")
}
test("resource request with valid value and without unit") {
verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), Map(CUSTOM_RES_1 -> "123"),
Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "")))
}
test("resource request with valid value and unit") {
verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), Map(CUSTOM_RES_1 -> "2g"),
Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 2, "G")))
}
test("two resource requests with valid values and units") {
verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1, CUSTOM_RES_2),
Map(CUSTOM_RES_1 -> "123m", CUSTOM_RES_2 -> "10G"),
Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "m"),
CUSTOM_RES_2 -> ResourceInformation(CUSTOM_RES_2, 10, "G")))
}
test("empty SparkConf should be valid") {
val sparkConf = new SparkConf()
ResourceRequestHelper.validateResources(sparkConf)
@ -89,61 +53,68 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
ResourceRequestHelper.validateResources(sparkConf)
}
test("memory defined with new config for executor") {
val sparkConf = new SparkConf()
sparkConf.set(NEW_CONFIG_EXECUTOR_MEMORY, "30G")
verifyValidateResourcesException(sparkConf, NEW_CONFIG_EXECUTOR_MEMORY)
Seq(
"value with unit" -> Seq(ResourceInformation(CUSTOM_RES_1, 2, "G")),
"value without unit" -> Seq(ResourceInformation(CUSTOM_RES_1, 123, "")),
"multiple resources" -> Seq(ResourceInformation(CUSTOM_RES_1, 123, "m"),
ResourceInformation(CUSTOM_RES_2, 10, "G"))
).foreach { case (name, resources) =>
test(s"valid request: $name") {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val resourceDefs = resources.map { r => r.name }
val requests = resources.map { r => (r.name, r.value.toString + r.unit) }.toMap
ResourceRequestTestHelper.initializeResourceTypes(resourceDefs)
val resource = createResource()
ResourceRequestHelper.setResourceRequests(requests, resource)
resources.foreach { r =>
val requested = ResourceRequestTestHelper.getResourceInformationByName(resource, r.name)
assert(requested === r)
}
}
}
test("memory defined with new config for executor 2") {
val sparkConf = new SparkConf()
sparkConf.set(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb", "30G")
verifyValidateResourcesException(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb")
Seq(
("value does not match pattern", CUSTOM_RES_1, "**@#"),
("only unit defined", CUSTOM_RES_1, "m"),
("invalid unit", CUSTOM_RES_1, "123ppp")
).foreach { case (name, key, value) =>
test(s"invalid request: $name") {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
ResourceRequestTestHelper.initializeResourceTypes(Seq(key))
val resource = createResource()
val thrown = intercept[IllegalArgumentException] {
ResourceRequestHelper.setResourceRequests(Map(key -> value), resource)
}
thrown.getMessage should include (key)
}
}
test("memory defined with new config for executor 3") {
val sparkConf = new SparkConf()
sparkConf.set(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb", "30G")
verifyValidateResourcesException(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb")
Seq(
NEW_CONFIG_EXECUTOR_MEMORY -> "30G",
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb" -> "30G",
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb" -> "30G",
NEW_CONFIG_EXECUTOR_CORES -> "5",
YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores" -> "5",
NEW_CONFIG_AM_MEMORY -> "1G",
NEW_CONFIG_DRIVER_MEMORY -> "1G",
NEW_CONFIG_AM_CORES -> "3",
NEW_CONFIG_DRIVER_CORES -> "1G"
).foreach { case (key, value) =>
test(s"disallowed resource request: $key") {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val conf = new SparkConf(false).set(key, value)
val thrown = intercept[SparkException] {
ResourceRequestHelper.validateResources(conf)
}
thrown.getMessage should include (key)
}
}
test("cores defined with new config for executor") {
val sparkConf = new SparkConf()
sparkConf.set(NEW_CONFIG_EXECUTOR_CORES, "5")
verifyValidateResourcesException(sparkConf, NEW_CONFIG_EXECUTOR_CORES)
}
test("cores defined with new config for executor 2") {
val sparkConf = new SparkConf()
sparkConf.set(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores", "5")
verifyValidateResourcesException(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores")
}
test("memory defined with new config, client mode") {
val sparkConf = new SparkConf()
sparkConf.set(NEW_CONFIG_AM_MEMORY, "1G")
verifyValidateResourcesException(sparkConf, NEW_CONFIG_AM_MEMORY)
}
test("memory defined with new config for driver, cluster mode") {
val sparkConf = new SparkConf()
sparkConf.set(NEW_CONFIG_DRIVER_MEMORY, "1G")
verifyValidateResourcesException(sparkConf, NEW_CONFIG_DRIVER_MEMORY)
}
test("cores defined with new config, client mode") {
val sparkConf = new SparkConf()
sparkConf.set(NEW_CONFIG_AM_CORES, "3")
verifyValidateResourcesException(sparkConf, NEW_CONFIG_AM_CORES)
}
test("cores defined with new config for driver, cluster mode") {
val sparkConf = new SparkConf()
sparkConf.set(NEW_CONFIG_DRIVER_CORES, "1G")
verifyValidateResourcesException(sparkConf, NEW_CONFIG_DRIVER_CORES)
}
test("various duplicated definitions") {
test("multiple disallowed resources in config") {
val sparkConf = new SparkConf()
sparkConf.set(DRIVER_MEMORY.key, "2G")
sparkConf.set(DRIVER_CORES.key, "2")
@ -163,52 +134,6 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
include(NEW_CONFIG_DRIVER_MEMORY))
}
private def verifySetResourceRequestsSuccessful(
definedResourceTypes: List[String],
resourceRequests: Map[String, String],
expectedResources: Map[String, ResourceInformation]): Unit = {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
ResourceRequestTestHelper.initializeResourceTypes(definedResourceTypes)
val resource = createResource()
ResourceRequestHelper.setResourceRequests(resourceRequests, resource)
expectedResources.foreach { case (name, ri) =>
val resourceInfo = ResourceRequestTestHelper.getResourceInformationByName(resource, name)
assert(resourceInfo === ri)
}
}
private def verifySetResourceRequestsException(
definedResourceTypes: List[String],
resourceRequests: Map[String, String],
message: String): Unit = {
val resource = createResource()
verifySetResourceRequestsException(definedResourceTypes, resource, resourceRequests, message)
}
private def verifySetResourceRequestsException(
definedResourceTypes: List[String],
resource: Resource,
resourceRequests: Map[String, String],
message: String) = {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
ResourceRequestTestHelper.initializeResourceTypes(definedResourceTypes)
val thrown = intercept[IllegalArgumentException] {
ResourceRequestHelper.setResourceRequests(resourceRequests, resource)
}
if (!message.isEmpty) {
thrown.getMessage should include (message)
}
}
private def verifyValidateResourcesException(sparkConf: SparkConf, message: String) = {
val thrown = intercept[SparkException] {
ResourceRequestHelper.validateResources(sparkConf)
}
thrown.getMessage should include (message)
}
private def createResource(): Resource = {
val resource = Records.newRecord(classOf[Resource])
resource.setMemory(512)

View file

@ -18,20 +18,18 @@
package org.apache.spark.deploy.yarn
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.yarn.api.records.Resource
import org.apache.spark.util.Utils
object ResourceRequestTestHelper {
def initializeResourceTypes(resourceTypes: List[String]): Unit = {
def initializeResourceTypes(resourceTypes: Seq[String]): Unit = {
if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
throw new IllegalStateException("This method should not be invoked " +
"since YARN resource types is not available because of old Hadoop version!" )
}
val allResourceTypes = new ListBuffer[AnyRef]
// ResourceUtils.reinitializeResources() is the YARN-way
// to specify resources for the execution of the tests.
// This method should receive standard resources with names of memory-mb and vcores.
@ -42,8 +40,7 @@ object ResourceRequestTestHelper {
createResourceTypeInfo("memory-mb"),
createResourceTypeInfo("vcores"))
val customResourceTypes = resourceTypes.map(createResourceTypeInfo)
allResourceTypes ++= defaultResourceTypes
allResourceTypes ++= customResourceTypes
val allResourceTypes = defaultResourceTypes ++ customResourceTypes
val resourceUtilsClass =
Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
@ -58,8 +55,8 @@ object ResourceRequestTestHelper {
resTypeInfoNewInstanceMethod.invoke(null, resourceName)
}
def getResourceTypeValue(res: Resource, name: String): AnyRef = {
val resourceInformation = getResourceInformation(res, name)
def getRequestedValue(res: Resource, rtype: String): AnyRef = {
val resourceInformation = getResourceInformation(res, rtype)
invokeMethod(resourceInformation, "getValue")
}
@ -88,5 +85,5 @@ object ResourceRequestTestHelper {
getValueMethod.invoke(resourceInformation)
}
case class ResourceInformation(name: String, value: Long, units: String)
case class ResourceInformation(name: String, value: Long, unit: String)
}