[SPARK-7173][YARN] Add label expression support for application master

Add label expression support for AM to restrict it runs on the specific set of nodes. I tested it locally and works fine.

sryza and vanzin please help to review, thanks a lot.

Author: jerryshao <sshao@hortonworks.com>

Closes #9800 from jerryshao/SPARK-7173.
This commit is contained in:
jerryshao 2015-11-23 10:41:17 -08:00 committed by Marcelo Vanzin
parent 946b406519
commit 5fd86e4fc2
2 changed files with 34 additions and 1 deletions

View file

@ -326,6 +326,15 @@ If you need a reference to the proper location to put log files in the YARN so t
Otherwise, the client process will exit after submission.
</td>
</tr>
<tr>
<td><code>spark.yarn.am.nodeLabelExpression</code></td>
<td>(none)</td>
<td>
A YARN node label expression that restricts the set of nodes AM will be scheduled on.
Only versions of YARN greater than or equal to 2.6 support node label expressions, so when
running against earlier versions, this property will be ignored.
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.nodeLabelExpression</code></td>
<td>(none)</td>

View file

@ -225,7 +225,31 @@ private[spark] class Client(
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
capability.setVirtualCores(args.amCores)
appContext.setResource(capability)
if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) {
try {
val amRequest = Records.newRecord(classOf[ResourceRequest])
amRequest.setResourceName(ResourceRequest.ANY)
amRequest.setPriority(Priority.newInstance(0))
amRequest.setCapability(capability)
amRequest.setNumContainers(1)
val amLabelExpression = sparkConf.get("spark.yarn.am.nodeLabelExpression")
val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
method.invoke(amRequest, amLabelExpression)
val setResourceRequestMethod =
appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
setResourceRequestMethod.invoke(appContext, amRequest)
} catch {
case e: NoSuchMethodException =>
logWarning("Ignoring spark.yarn.am.nodeLabelExpression because the version " +
"of YARN does not support it")
appContext.setResource(capability)
}
} else {
appContext.setResource(capability)
}
appContext
}