[SPARK-27378][YARN] YARN support for GPU-aware scheduling

## What changes were proposed in this pull request?

Add yarn support for GPU-aware scheduling. Since SPARK-20327 already added yarn custom resource support, this jira is really just making sure the spark resource configs get mapped into the yarn resource configs and user doesn't specify both yarn and spark config for the known types of resources (gpu and fpga are the known types on yarn).

You can find more details on the design and requirements documented: https://issues.apache.org/jira/browse/SPARK-27376

Note that the running on yarn docs already state to use it, it must be yarn 3.0+. We will add any further documentation under SPARK-20327

## How was this patch tested?

Unit tests and manually testing on yarn cluster

Closes #24634 from tgravescs/SPARK-27361.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Thomas Graves 2019-05-30 13:23:46 -07:00 committed by Marcelo Vanzin
parent 955eef95b3
commit 0ced4c0b13
7 changed files with 132 additions and 6 deletions

View file

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil}
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
@ -238,12 +239,15 @@ private[spark] class Client(
def createApplicationSubmissionContext(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val amResources =
val yarnAMResources =
if (isClusterMode) {
sparkConf.getAllWithPrefix(config.YARN_DRIVER_RESOURCE_TYPES_PREFIX).toMap
} else {
sparkConf.getAllWithPrefix(config.YARN_AM_RESOURCE_TYPES_PREFIX).toMap
}
val amResources = yarnAMResources ++
getYarnResourcesFromSparkResources(SPARK_DRIVER_RESOURCE_PREFIX, sparkConf)
logDebug(s"AM resources: $amResources")
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))

View file

@ -26,6 +26,7 @@ import scala.util.Try
import org.apache.hadoop.yarn.api.records.Resource
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
@ -66,7 +67,16 @@ private object ResourceRequestHelper extends Logging {
(EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores"),
(AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
(DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
(EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"))
(EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}",
s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
(s"${SPARK_DRIVER_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}",
s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu{SPARK_RESOURCE_COUNT_SUFFIX}",
s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"),
(s"${SPARK_DRIVER_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}",
s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"))
val errorMessage = new mutable.StringBuilder()
resourceDefinitions.foreach { case (sparkName, resourceRequest) =>

View file

@ -142,7 +142,8 @@ private[yarn] class YarnAllocator(
protected val executorCores = sparkConf.get(EXECUTOR_CORES)
private val executorResourceRequests =
sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap
sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap ++
getYarnResourcesFromSparkResources(SPARK_EXECUTOR_RESOURCE_PREFIX, sparkConf)
// Resource capability requested for each executor
private[yarn] val resource: Resource = {

View file

@ -25,7 +25,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.spark.SecurityManager
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.util.Utils
@ -39,6 +40,25 @@ object YarnSparkHadoopUtil {
val MEMORY_OVERHEAD_MIN = 384L
val ANY_HOST = "*"
val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu"
val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga"
/**
* Convert Spark resources into YARN resources.
* The only resources we know how to map from spark configs to yarn configs are
* gpus and fpgas, everything else the user has to specify them in both the
* spark.yarn.*.resource and the spark.*.resource configs.
*/
private[yarn] def getYarnResourcesFromSparkResources(
confPrefix: String,
sparkConf: SparkConf
): Map[String, String] = {
Map("gpu" -> YARN_GPU_RESOURCE_CONFIG, "fpga" -> YARN_FPGA_RESOURCE_CONFIG).map {
case (rName, yarnName) =>
val resourceCountSparkConf = s"${confPrefix}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}"
(yarnName -> sparkConf.get(resourceCountSparkConf, "0"))
}.filter { case (_, count) => count.toLong > 0 }
}
// All RM requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)

View file

@ -38,7 +38,8 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean, anyShort, eq => meq}
import org.mockito.Mockito.{spy, verify}
import org.scalatest.Matchers
import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config._
import org.apache.spark.util.{SparkConfWithEnv, Utils}
@ -389,6 +390,62 @@ class ClientSuite extends SparkFunSuite with Matchers {
}
}
test(s"custom resource request yarn config and spark config fails") {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", YARN_FPGA_RESOURCE_CONFIG -> "fpga")
ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
resources.keys.foreach { yarnName =>
conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
}
resources.values.foreach { rName =>
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}", "3")
}
val error = intercept[SparkException] {
ResourceRequestHelper.validateResources(conf)
}.getMessage()
assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga," +
" please use spark.driver.resource.fpga"))
assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu," +
" please use spark.driver.resource.gpu"))
}
test(s"custom resources spark config mapped to yarn config") {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val yarnMadeupResource = "yarn.io/madeup"
val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu",
YARN_FPGA_RESOURCE_CONFIG -> "fpga",
yarnMadeupResource -> "madeup")
ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
resources.values.foreach { rName =>
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}", "3")
}
// also just set yarn one that we don't convert
conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5")
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
val client = new Client(new ClientArguments(Array()), conf, null)
val newContext = client.createApplicationSubmissionContext(
new YarnClientApplication(getNewApplicationResponse, appContext),
containerLaunchContext)
val yarnRInfo = ResourceRequestTestHelper.getResources(newContext.getResource)
val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> rInfo.value)).toMap
assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty)
assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3)
assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).nonEmpty)
assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).get === 3)
assert(allResourceInfo.get(yarnMadeupResource).nonEmpty)
assert(allResourceInfo.get(yarnMadeupResource).get === 5)
}
private val matching = Seq(
("files URI match test1", "file:///file1", "file:///file2"),
("files URI match test2", "file:///c:file1", "file://c:file2"),

View file

@ -85,5 +85,16 @@ object ResourceRequestTestHelper {
getValueMethod.invoke(resourceInformation)
}
def getResources(res: Resource): Array[ResourceInformation] = {
val getResourceInformationMethod = res.getClass.getMethod("getResources")
val rInfoArray = getResourceInformationMethod.invoke(res).asInstanceOf[Array[AnyRef]]
rInfoArray.map { rInfo =>
val name = invokeMethod(rInfo, "getName").asInstanceOf[String]
val value = invokeMethod(rInfo, "getValue").asInstanceOf[Long]
val units = invokeMethod(rInfo, "getUnits").asInstanceOf[String]
ResourceInformation(name, value, units)
}
}
case class ResourceInformation(name: String, value: Long, unit: String)
}

View file

@ -21,7 +21,6 @@ import java.util.Collections
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
@ -182,6 +181,30 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
assert(containerRequest.getCapability === expectedResources)
}
test("custom spark resource mapped to yarn resource configs") {
assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
val yarnMadeupResource = "yarn.io/madeup"
val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG, yarnMadeupResource)
ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
val sparkResources =
Map(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}" -> "3",
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}" -> "2",
s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}" -> "5")
val handler = createAllocator(1, mockAmClient, sparkResources)
handler.updateResourceRequests()
val yarnRInfo = ResourceRequestTestHelper.getResources(handler.resource)
val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) ).toMap
assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty)
assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3)
assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).nonEmpty)
assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).get === 2)
assert(allResourceInfo.get(yarnMadeupResource).nonEmpty)
assert(allResourceInfo.get(yarnMadeupResource).get === 5)
}
test("container should not be created if requested number if met") {
// request a single container and receive it
val handler = createAllocator(1)