From 0ced4c0b132334dac27de3ce45fd4cab8261cfb5 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 30 May 2019 13:23:46 -0700 Subject: [PATCH] [SPARK-27378][YARN] YARN support for GPU-aware scheduling ## What changes were proposed in this pull request? Add yarn support for GPU-aware scheduling. Since SPARK-20327 already added yarn custom resource support, this jira is really just making sure the spark resource configs get mapped into the yarn resource configs and user doesn't specify both yarn and spark config for the known types of resources (gpu and fpga are the known types on yarn). You can find more details on the design and requirements documented: https://issues.apache.org/jira/browse/SPARK-27376 Note that the running on yarn docs already state to use it, it must be yarn 3.0+. We will add any further documentation under SPARK-20327 ## How was this patch tested? Unit tests and manually testing on yarn cluster Closes #24634 from tgravescs/SPARK-27361. Authored-by: Thomas Graves Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/deploy/yarn/Client.scala | 6 +- .../deploy/yarn/ResourceRequestHelper.scala | 12 +++- .../spark/deploy/yarn/YarnAllocator.scala | 3 +- .../deploy/yarn/YarnSparkHadoopUtil.scala | 22 ++++++- .../spark/deploy/yarn/ClientSuite.scala | 59 ++++++++++++++++++- .../yarn/ResourceRequestTestHelper.scala | 11 ++++ .../deploy/yarn/YarnAllocatorSuite.scala | 25 +++++++- 7 files changed, 132 insertions(+), 6 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e0dba8c21b..b26e56a68f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil} import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -238,12 +239,15 @@ private[spark] class Client( def createApplicationSubmissionContext( newApp: YarnClientApplication, containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { - val amResources = + + val yarnAMResources = if (isClusterMode) { sparkConf.getAllWithPrefix(config.YARN_DRIVER_RESOURCE_TYPES_PREFIX).toMap } else { sparkConf.getAllWithPrefix(config.YARN_AM_RESOURCE_TYPES_PREFIX).toMap } + val amResources = yarnAMResources ++ + getYarnResourcesFromSparkResources(SPARK_DRIVER_RESOURCE_PREFIX, sparkConf) logDebug(s"AM resources: $amResources") val appContext = newApp.getApplicationSubmissionContext appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark")) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala index 012268ea85..66e47819b0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala @@ -26,6 +26,7 @@ import scala.util.Try import org.apache.hadoop.yarn.api.records.Resource import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -66,7 +67,16 @@ private object ResourceRequestHelper extends Logging { (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores"), (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cpu-vcores"), (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cpu-vcores"), - (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores")) + (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"), + (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}", + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"), + (s"${SPARK_DRIVER_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}", + s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"), + (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu{SPARK_RESOURCE_COUNT_SUFFIX}", + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"), + (s"${SPARK_DRIVER_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}", + s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}")) + val errorMessage = new mutable.StringBuilder() resourceDefinitions.foreach { case (sparkName, resourceRequest) => diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 1dc9d49f17..069b28797e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -142,7 +142,8 @@ private[yarn] class YarnAllocator( protected val executorCores = sparkConf.get(EXECUTOR_CORES) private val executorResourceRequests = - sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap + sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap ++ + getYarnResourcesFromSparkResources(SPARK_EXECUTOR_RESOURCE_PREFIX, sparkConf) // Resource capability requested for each executor private[yarn] val resource: Resource = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index b904687daf..7c8ee031b1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -25,7 +25,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.spark.SecurityManager +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.internal.config._ import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.util.Utils @@ -39,6 +40,25 @@ object YarnSparkHadoopUtil { val MEMORY_OVERHEAD_MIN = 384L val ANY_HOST = "*" + val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu" + val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga" + + /** + * Convert Spark resources into YARN resources. + * The only resources we know how to map from spark configs to yarn configs are + * gpus and fpgas, everything else the user has to specify them in both the + * spark.yarn.*.resource and the spark.*.resource configs. + */ + private[yarn] def getYarnResourcesFromSparkResources( + confPrefix: String, + sparkConf: SparkConf + ): Map[String, String] = { + Map("gpu" -> YARN_GPU_RESOURCE_CONFIG, "fpga" -> YARN_FPGA_RESOURCE_CONFIG).map { + case (rName, yarnName) => + val resourceCountSparkConf = s"${confPrefix}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}" + (yarnName -> sparkConf.get(resourceCountSparkConf, "0")) + }.filter { case (_, count) => count.toLong > 0 } + } // All RM requests are issued with same priority : we do not (yet) have any distinction between // request types (like map/reduce in hadoop for example) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 5ff8e06331..2c2c237617 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -38,7 +38,8 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean, anyShort, eq => meq} import org.mockito.Mockito.{spy, verify} import org.scalatest.Matchers -import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils} +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config._ import org.apache.spark.util.{SparkConfWithEnv, Utils} @@ -389,6 +390,62 @@ class ClientSuite extends SparkFunSuite with Matchers { } } + test(s"custom resource request yarn config and spark config fails") { + assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", YARN_FPGA_RESOURCE_CONFIG -> "fpga") + ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) + + val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") + resources.keys.foreach { yarnName => + conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2") + } + resources.values.foreach { rName => + conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}", "3") + } + + val error = intercept[SparkException] { + ResourceRequestHelper.validateResources(conf) + }.getMessage() + + assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga," + + " please use spark.driver.resource.fpga")) + assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu," + + " please use spark.driver.resource.gpu")) + } + + test(s"custom resources spark config mapped to yarn config") { + assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + val yarnMadeupResource = "yarn.io/madeup" + val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", + YARN_FPGA_RESOURCE_CONFIG -> "fpga", + yarnMadeupResource -> "madeup") + ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) + + val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") + resources.values.foreach { rName => + conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}", "3") + } + // also just set yarn one that we don't convert + conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5") + val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) + val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) + val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + + val client = new Client(new ClientArguments(Array()), conf, null) + val newContext = client.createApplicationSubmissionContext( + new YarnClientApplication(getNewApplicationResponse, appContext), + containerLaunchContext) + + val yarnRInfo = ResourceRequestTestHelper.getResources(newContext.getResource) + val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> rInfo.value)).toMap + assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty) + assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3) + assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).nonEmpty) + assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).get === 3) + assert(allResourceInfo.get(yarnMadeupResource).nonEmpty) + assert(allResourceInfo.get(yarnMadeupResource).get === 5) + } + private val matching = Seq( ("files URI match test1", "file:///file1", "file:///file2"), ("files URI match test2", "file:///c:file1", "file://c:file2"), diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala index 953d447bf4..796e099032 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala @@ -85,5 +85,16 @@ object ResourceRequestTestHelper { getValueMethod.invoke(resourceInformation) } + def getResources(res: Resource): Array[ResourceInformation] = { + val getResourceInformationMethod = res.getClass.getMethod("getResources") + val rInfoArray = getResourceInformationMethod.invoke(res).asInstanceOf[Array[AnyRef]] + rInfoArray.map { rInfo => + val name = invokeMethod(rInfo, "getName").asInstanceOf[String] + val value = invokeMethod(rInfo, "getValue").asInstanceOf[Long] + val units = invokeMethod(rInfo, "getUnits").asInstanceOf[String] + ResourceInformation(name, value, units) + } + } + case class ResourceInformation(name: String, value: Long, unit: String) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 59291af72e..b16464cc0b 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -21,7 +21,6 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest @@ -182,6 +181,30 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter assert(containerRequest.getCapability === expectedResources) } + test("custom spark resource mapped to yarn resource configs") { + assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + val yarnMadeupResource = "yarn.io/madeup" + val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG, yarnMadeupResource) + ResourceRequestTestHelper.initializeResourceTypes(yarnResources) + val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) + + val sparkResources = + Map(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}" -> "3", + s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}" -> "2", + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}" -> "5") + val handler = createAllocator(1, mockAmClient, sparkResources) + + handler.updateResourceRequests() + val yarnRInfo = ResourceRequestTestHelper.getResources(handler.resource) + val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) ).toMap + assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty) + assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3) + assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).nonEmpty) + assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).get === 2) + assert(allResourceInfo.get(yarnMadeupResource).nonEmpty) + assert(allResourceInfo.get(yarnMadeupResource).get === 5) + } + test("container should not be created if requested number if met") { // request a single container and receive it val handler = createAllocator(1)