diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e0dba8c21b..b26e56a68f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil} import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -238,12 +239,15 @@ private[spark] class Client( def createApplicationSubmissionContext( newApp: YarnClientApplication, containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { - val amResources = + + val yarnAMResources = if (isClusterMode) { sparkConf.getAllWithPrefix(config.YARN_DRIVER_RESOURCE_TYPES_PREFIX).toMap } else { sparkConf.getAllWithPrefix(config.YARN_AM_RESOURCE_TYPES_PREFIX).toMap } + val amResources = yarnAMResources ++ + getYarnResourcesFromSparkResources(SPARK_DRIVER_RESOURCE_PREFIX, sparkConf) logDebug(s"AM resources: $amResources") val appContext = newApp.getApplicationSubmissionContext appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark")) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala index 012268ea85..66e47819b0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala @@ -26,6 +26,7 @@ import scala.util.Try import org.apache.hadoop.yarn.api.records.Resource import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -66,7 +67,16 @@ private object ResourceRequestHelper extends Logging { (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores"), (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cpu-vcores"), (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cpu-vcores"), - (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores")) + (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"), + (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}", + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"), + (s"${SPARK_DRIVER_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}", + s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"), + (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu{SPARK_RESOURCE_COUNT_SUFFIX}", + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"), + (s"${SPARK_DRIVER_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}", + s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}")) + val errorMessage = new mutable.StringBuilder() resourceDefinitions.foreach { case (sparkName, resourceRequest) => diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 1dc9d49f17..069b28797e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -142,7 +142,8 @@ private[yarn] class YarnAllocator( protected val executorCores = sparkConf.get(EXECUTOR_CORES) private val executorResourceRequests = - sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap + sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap ++ + getYarnResourcesFromSparkResources(SPARK_EXECUTOR_RESOURCE_PREFIX, sparkConf) // Resource capability requested for each executor private[yarn] val resource: Resource = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index b904687daf..7c8ee031b1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -25,7 +25,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.spark.SecurityManager +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.internal.config._ import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.util.Utils @@ -39,6 +40,25 @@ object YarnSparkHadoopUtil { val MEMORY_OVERHEAD_MIN = 384L val ANY_HOST = "*" + val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu" + val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga" + + /** + * Convert Spark resources into YARN resources. + * The only resources we know how to map from spark configs to yarn configs are + * gpus and fpgas, everything else the user has to specify them in both the + * spark.yarn.*.resource and the spark.*.resource configs. + */ + private[yarn] def getYarnResourcesFromSparkResources( + confPrefix: String, + sparkConf: SparkConf + ): Map[String, String] = { + Map("gpu" -> YARN_GPU_RESOURCE_CONFIG, "fpga" -> YARN_FPGA_RESOURCE_CONFIG).map { + case (rName, yarnName) => + val resourceCountSparkConf = s"${confPrefix}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}" + (yarnName -> sparkConf.get(resourceCountSparkConf, "0")) + }.filter { case (_, count) => count.toLong > 0 } + } // All RM requests are issued with same priority : we do not (yet) have any distinction between // request types (like map/reduce in hadoop for example) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 5ff8e06331..2c2c237617 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -38,7 +38,8 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean, anyShort, eq => meq} import org.mockito.Mockito.{spy, verify} import org.scalatest.Matchers -import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils} +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config._ import org.apache.spark.util.{SparkConfWithEnv, Utils} @@ -389,6 +390,62 @@ class ClientSuite extends SparkFunSuite with Matchers { } } + test(s"custom resource request yarn config and spark config fails") { + assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", YARN_FPGA_RESOURCE_CONFIG -> "fpga") + ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) + + val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") + resources.keys.foreach { yarnName => + conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2") + } + resources.values.foreach { rName => + conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}", "3") + } + + val error = intercept[SparkException] { + ResourceRequestHelper.validateResources(conf) + }.getMessage() + + assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga," + + " please use spark.driver.resource.fpga")) + assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu," + + " please use spark.driver.resource.gpu")) + } + + test(s"custom resources spark config mapped to yarn config") { + assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + val yarnMadeupResource = "yarn.io/madeup" + val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "gpu", + YARN_FPGA_RESOURCE_CONFIG -> "fpga", + yarnMadeupResource -> "madeup") + ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) + + val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") + resources.values.foreach { rName => + conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_COUNT_SUFFIX}", "3") + } + // also just set yarn one that we don't convert + conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5") + val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) + val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) + val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + + val client = new Client(new ClientArguments(Array()), conf, null) + val newContext = client.createApplicationSubmissionContext( + new YarnClientApplication(getNewApplicationResponse, appContext), + containerLaunchContext) + + val yarnRInfo = ResourceRequestTestHelper.getResources(newContext.getResource) + val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.name -> rInfo.value)).toMap + assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty) + assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3) + assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).nonEmpty) + assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).get === 3) + assert(allResourceInfo.get(yarnMadeupResource).nonEmpty) + assert(allResourceInfo.get(yarnMadeupResource).get === 5) + } + private val matching = Seq( ("files URI match test1", "file:///file1", "file:///file2"), ("files URI match test2", "file:///c:file1", "file://c:file2"), diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala index 953d447bf4..796e099032 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala @@ -85,5 +85,16 @@ object ResourceRequestTestHelper { getValueMethod.invoke(resourceInformation) } + def getResources(res: Resource): Array[ResourceInformation] = { + val getResourceInformationMethod = res.getClass.getMethod("getResources") + val rInfoArray = getResourceInformationMethod.invoke(res).asInstanceOf[Array[AnyRef]] + rInfoArray.map { rInfo => + val name = invokeMethod(rInfo, "getName").asInstanceOf[String] + val value = invokeMethod(rInfo, "getValue").asInstanceOf[Long] + val units = invokeMethod(rInfo, "getUnits").asInstanceOf[String] + ResourceInformation(name, value, units) + } + } + case class ResourceInformation(name: String, value: Long, unit: String) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 59291af72e..b16464cc0b 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -21,7 +21,6 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest @@ -182,6 +181,30 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter assert(containerRequest.getCapability === expectedResources) } + test("custom spark resource mapped to yarn resource configs") { + assume(ResourceRequestHelper.isYarnResourceTypesAvailable()) + val yarnMadeupResource = "yarn.io/madeup" + val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG, yarnMadeupResource) + ResourceRequestTestHelper.initializeResourceTypes(yarnResources) + val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) + + val sparkResources = + Map(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_COUNT_SUFFIX}" -> "3", + s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_COUNT_SUFFIX}" -> "2", + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}" -> "5") + val handler = createAllocator(1, mockAmClient, sparkResources) + + handler.updateResourceRequests() + val yarnRInfo = ResourceRequestTestHelper.getResources(handler.resource) + val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.name -> rInfo.value) ).toMap + assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).nonEmpty) + assert(allResourceInfo.get(YARN_GPU_RESOURCE_CONFIG).get === 3) + assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).nonEmpty) + assert(allResourceInfo.get(YARN_FPGA_RESOURCE_CONFIG).get === 2) + assert(allResourceInfo.get(yarnMadeupResource).nonEmpty) + assert(allResourceInfo.get(yarnMadeupResource).get === 5) + } + test("container should not be created if requested number if met") { // request a single container and receive it val handler = createAllocator(1)