[SPARK-6470] [YARN] Add support for YARN node labels.

This is difficult to write a test for because it relies on the latest version of YARN, but I verified manually that the patch does pass along the label expression on this version and containers are successfully launched.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #5242 from sryza/sandy-spark-6470 and squashes the following commits:

6af87b9 [Sandy Ryza] Change info to warning
6e22d99 [Sandy Ryza] [YARN] SPARK-6470.  Add support for YARN node labels.
This commit is contained in:
Sandy Ryza 2015-05-11 12:09:39 -07:00
parent 0a4844f90a
commit 82fee9d9aa
2 changed files with 39 additions and 1 deletions

View file

@ -220,6 +220,15 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
Otherwise, the client process will exit after submission.
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.nodeLabelExpression</code></td>
<td>(none)</td>
<td>
A YARN node label expression that restricts the set of nodes executors will be scheduled on.
Only versions of YARN greater than or equal to 2.6 support node label expressions, so when
running against earlier versions, this property will be ignored.
</td>
</tr>
</table>
# Launching Spark on YARN

View file

@ -117,6 +117,24 @@ private[yarn] class YarnAllocator(
// For testing
private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
private val labelExpression = sparkConf.getOption("spark.yarn.executor.nodeLabelExpression")
// ContainerRequest constructor that can take a node label expression. We grab it through
// reflection because it's only available in later versions of YARN.
private val nodeLabelConstructor = labelExpression.flatMap { expr =>
try {
Some(classOf[ContainerRequest].getConstructor(classOf[Resource],
classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean],
classOf[String]))
} catch {
case e: NoSuchMethodException => {
logWarning(s"Node label expression $expr will be ignored because YARN version on" +
" classpath does not support it.")
None
}
}
}
def getNumExecutorsRunning: Int = numExecutorsRunning
def getNumExecutorsFailed: Int = numExecutorsFailed
@ -211,7 +229,7 @@ private[yarn] class YarnAllocator(
s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
for (i <- 0 until missing) {
val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
val request = createContainerRequest(resource)
amClient.addContainerRequest(request)
val nodes = request.getNodes
val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
@ -230,6 +248,17 @@ private[yarn] class YarnAllocator(
}
}
/**
* Creates a container request, handling the reflection required to use YARN features that were
* added in recent versions.
*/
private def createContainerRequest(resource: Resource): ContainerRequest = {
nodeLabelConstructor.map { constructor =>
constructor.newInstance(resource, null, null, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
labelExpression.orNull)
}.getOrElse(new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY))
}
/**
* Handle containers granted by the RM by launching executors on them.
*