Merge pull request #371 from tgravescs/yarn_client_addjar_misc_fixes
Yarn client addjar and misc fixes Fix the addJar functionality in yarn-client mode, add support for the other options supported in yarn-standalone mode, set the application type on yarn in hadoop 2.X, add documentation, change heartbeat interval to be same code as the yarn-standalone so it doesn't take so long to get containers and exit.
This commit is contained in:
commit
7cef8435d7
|
@ -677,10 +677,10 @@ class SparkContext(
|
|||
key = uri.getScheme match {
|
||||
// A JAR file which exists only on the driver node
|
||||
case null | "file" =>
|
||||
if (SparkHadoopUtil.get.isYarnMode()) {
|
||||
// In order for this to work on yarn the user must specify the --addjars option to
|
||||
// the client to upload the file into the distributed cache to make it show up in the
|
||||
// current working directory.
|
||||
if (SparkHadoopUtil.get.isYarnMode() && master == "yarn-standalone") {
|
||||
// In order for this to work in yarn standalone mode the user must specify the
|
||||
// --addjars option to the client to upload the file into the distributed cache
|
||||
// of the AM to make it show up in the current working directory.
|
||||
val fileName = new Path(uri.getPath).getName()
|
||||
try {
|
||||
env.httpFileServer.addJar(new File(fileName))
|
||||
|
|
|
@ -101,7 +101,19 @@ With this mode, your application is actually run on the remote machine where the
|
|||
|
||||
With yarn-client mode, the application will be launched locally. Just like running application or spark-shell on Local / Mesos / Standalone mode. The launch method is also the similar with them, just make sure that when you need to specify a master url, use "yarn-client" instead. And you also need to export the env value for SPARK_JAR and SPARK_YARN_APP_JAR
|
||||
|
||||
In order to tune worker core/number/memory etc. You need to export SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_WORKER_INSTANCES e.g. by ./conf/spark-env.sh
|
||||
Configuration in yarn-client mode:
|
||||
|
||||
In order to tune worker core/number/memory etc. You need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
|
||||
|
||||
* `SPARK_YARN_APP_JAR`, Path to your application's JAR file (required)
|
||||
* `SPARK_WORKER_INSTANCES`, Number of workers to start (Default: 2)
|
||||
* `SPARK_WORKER_CORES`, Number of cores for the workers (Default: 1).
|
||||
* `SPARK_WORKER_MEMORY`, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
|
||||
* `SPARK_MASTER_MEMORY`, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
|
||||
* `SPARK_YARN_APP_NAME`, The name of your application (Default: Spark)
|
||||
* `SPARK_YARN_QUEUE`, The hadoop queue to use for allocation requests (Default: 'default')
|
||||
* `SPARK_YARN_DIST_FILES`, Comma separated list of files to be distributed with the job.
|
||||
* `SPARK_YARN_DIST_ARCHIVES`, Comma separated list of archives to be distributed with the job.
|
||||
|
||||
For example:
|
||||
|
||||
|
@ -114,7 +126,6 @@ For example:
|
|||
SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
|
||||
MASTER=yarn-client ./bin/spark-shell
|
||||
|
||||
You can also send extra files to yarn cluster for worker to use by exporting SPARK_YARN_DIST_FILES=file1,file2... etc.
|
||||
|
||||
# Building Spark for Hadoop/YARN 2.2.x
|
||||
|
||||
|
|
|
@ -127,13 +127,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
|
|||
// local dirs, so lets check both. We assume one of the 2 is set.
|
||||
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
|
||||
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
|
||||
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
|
||||
.getOrElse(""))
|
||||
.orElse(Option(System.getenv("LOCAL_DIRS")))
|
||||
|
||||
if (localDirs.isEmpty()) {
|
||||
throw new Exception("Yarn Local dirs can't be empty")
|
||||
localDirs match {
|
||||
case None => throw new Exception("Yarn Local dirs can't be empty")
|
||||
case Some(l) => l
|
||||
}
|
||||
localDirs
|
||||
}
|
||||
|
||||
private def getApplicationAttemptId(): ApplicationAttemptId = {
|
||||
|
|
|
@ -76,6 +76,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
|
|||
|
||||
def run() {
|
||||
|
||||
// Setup the directories so things go to yarn approved directories rather
|
||||
// then user specified and /tmp.
|
||||
System.setProperty("spark.local.dir", getLocalDirs())
|
||||
|
||||
appAttemptId = getApplicationAttemptId()
|
||||
resourceManager = registerWithResourceManager()
|
||||
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
|
||||
|
@ -103,10 +107,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
|
|||
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
|
||||
|
||||
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
|
||||
// must be <= timeoutInterval/ 2.
|
||||
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
|
||||
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
|
||||
val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
|
||||
// we want to be reasonably responsive without causing too many requests to RM.
|
||||
val schedulerInterval =
|
||||
System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
|
||||
// must be <= timeoutInterval / 2.
|
||||
val interval = math.min(timeoutInterval / 2, schedulerInterval)
|
||||
|
||||
reporterThread = launchReporterThread(interval)
|
||||
|
||||
// Wait for the reporter thread to Finish.
|
||||
|
@ -119,6 +125,20 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
|
|||
System.exit(0)
|
||||
}
|
||||
|
||||
/** Get the Yarn approved local directories. */
|
||||
private def getLocalDirs(): String = {
|
||||
// Hadoop 0.23 and 2.x have different Environment variable names for the
|
||||
// local dirs, so lets check both. We assume one of the 2 is set.
|
||||
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
|
||||
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
|
||||
.orElse(Option(System.getenv("LOCAL_DIRS")))
|
||||
|
||||
localDirs match {
|
||||
case None => throw new Exception("Yarn Local dirs can't be empty")
|
||||
case Some(l) => l
|
||||
}
|
||||
}
|
||||
|
||||
private def getApplicationAttemptId(): ApplicationAttemptId = {
|
||||
val envs = System.getenv()
|
||||
val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
|
||||
|
|
|
@ -22,6 +22,8 @@ import org.apache.spark.{SparkException, Logging, SparkContext}
|
|||
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
|
||||
import org.apache.spark.scheduler.TaskSchedulerImpl
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
private[spark] class YarnClientSchedulerBackend(
|
||||
scheduler: TaskSchedulerImpl,
|
||||
sc: SparkContext)
|
||||
|
@ -31,45 +33,47 @@ private[spark] class YarnClientSchedulerBackend(
|
|||
var client: Client = null
|
||||
var appId: ApplicationId = null
|
||||
|
||||
private[spark] def addArg(optionName: String, optionalParam: String, arrayBuf: ArrayBuffer[String]) {
|
||||
Option(System.getenv(optionalParam)) foreach {
|
||||
optParam => {
|
||||
arrayBuf += (optionName, optParam)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def start() {
|
||||
super.start()
|
||||
|
||||
val defalutWorkerCores = "2"
|
||||
val defalutWorkerMemory = "512m"
|
||||
val defaultWorkerNumber = "1"
|
||||
|
||||
val userJar = System.getenv("SPARK_YARN_APP_JAR")
|
||||
val distFiles = System.getenv("SPARK_YARN_DIST_FILES")
|
||||
var workerCores = System.getenv("SPARK_WORKER_CORES")
|
||||
var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
|
||||
var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
|
||||
|
||||
if (userJar == null)
|
||||
throw new SparkException("env SPARK_YARN_APP_JAR is not set")
|
||||
|
||||
if (workerCores == null)
|
||||
workerCores = defalutWorkerCores
|
||||
if (workerMemory == null)
|
||||
workerMemory = defalutWorkerMemory
|
||||
if (workerNumber == null)
|
||||
workerNumber = defaultWorkerNumber
|
||||
|
||||
val driverHost = conf.get("spark.driver.host")
|
||||
val driverPort = conf.get("spark.driver.port")
|
||||
val hostport = driverHost + ":" + driverPort
|
||||
|
||||
val argsArray = Array[String](
|
||||
val argsArrayBuf = new ArrayBuffer[String]()
|
||||
argsArrayBuf += (
|
||||
"--class", "notused",
|
||||
"--jar", userJar,
|
||||
"--args", hostport,
|
||||
"--worker-memory", workerMemory,
|
||||
"--worker-cores", workerCores,
|
||||
"--num-workers", workerNumber,
|
||||
"--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher",
|
||||
"--files", distFiles
|
||||
"--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
|
||||
)
|
||||
|
||||
val args = new ClientArguments(argsArray, conf)
|
||||
// process any optional arguments, use the defaults already defined in ClientArguments
|
||||
// if things aren't specified
|
||||
Map("--master-memory" -> "SPARK_MASTER_MEMORY",
|
||||
"--num-workers" -> "SPARK_WORKER_INSTANCES",
|
||||
"--worker-memory" -> "SPARK_WORKER_MEMORY",
|
||||
"--worker-cores" -> "SPARK_WORKER_CORES",
|
||||
"--queue" -> "SPARK_YARN_QUEUE",
|
||||
"--name" -> "SPARK_YARN_APP_NAME",
|
||||
"--files" -> "SPARK_YARN_DIST_FILES",
|
||||
"--archives" -> "SPARK_YARN_DIST_ARCHIVES")
|
||||
.foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) }
|
||||
|
||||
logDebug("ClientArguments called with: " + argsArrayBuf)
|
||||
val args = new ClientArguments(argsArrayBuf.toArray, conf)
|
||||
client = new Client(args, conf)
|
||||
appId = client.runApp()
|
||||
waitForApp()
|
||||
|
|
|
@ -116,13 +116,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
|
|||
// local dirs, so lets check both. We assume one of the 2 is set.
|
||||
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
|
||||
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
|
||||
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
|
||||
.getOrElse(""))
|
||||
.orElse(Option(System.getenv("LOCAL_DIRS")))
|
||||
|
||||
if (localDirs.isEmpty()) {
|
||||
throw new Exception("Yarn Local dirs can't be empty")
|
||||
localDirs match {
|
||||
case None => throw new Exception("Yarn Local dirs can't be empty")
|
||||
case Some(l) => l
|
||||
}
|
||||
localDirs
|
||||
}
|
||||
|
||||
private def getApplicationAttemptId(): ApplicationAttemptId = {
|
||||
|
|
|
@ -99,6 +99,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
|
|||
appContext.setApplicationName(args.appName)
|
||||
appContext.setQueue(args.amQueue)
|
||||
appContext.setAMContainerSpec(amContainer)
|
||||
appContext.setApplicationType("SPARK")
|
||||
|
||||
// Memory for the ApplicationMaster.
|
||||
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
|
||||
|
|
|
@ -78,6 +78,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
|
|||
|
||||
def run() {
|
||||
|
||||
// Setup the directories so things go to yarn approved directories rather
|
||||
// then user specified and /tmp.
|
||||
System.setProperty("spark.local.dir", getLocalDirs())
|
||||
|
||||
amClient = AMRMClient.createAMRMClient()
|
||||
amClient.init(yarnConf)
|
||||
amClient.start()
|
||||
|
@ -94,10 +98,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
|
|||
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
|
||||
|
||||
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
|
||||
// must be <= timeoutInterval/ 2.
|
||||
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
|
||||
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
|
||||
val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L))
|
||||
// we want to be reasonably responsive without causing too many requests to RM.
|
||||
val schedulerInterval =
|
||||
System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
|
||||
// must be <= timeoutInterval / 2.
|
||||
val interval = math.min(timeoutInterval / 2, schedulerInterval)
|
||||
|
||||
reporterThread = launchReporterThread(interval)
|
||||
|
||||
// Wait for the reporter thread to Finish.
|
||||
|
@ -110,6 +116,20 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
|
|||
System.exit(0)
|
||||
}
|
||||
|
||||
/** Get the Yarn approved local directories. */
|
||||
private def getLocalDirs(): String = {
|
||||
// Hadoop 0.23 and 2.x have different Environment variable names for the
|
||||
// local dirs, so lets check both. We assume one of the 2 is set.
|
||||
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
|
||||
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
|
||||
.orElse(Option(System.getenv("LOCAL_DIRS")))
|
||||
|
||||
localDirs match {
|
||||
case None => throw new Exception("Yarn Local dirs can't be empty")
|
||||
case Some(l) => l
|
||||
}
|
||||
}
|
||||
|
||||
private def getApplicationAttemptId(): ApplicationAttemptId = {
|
||||
val envs = System.getenv()
|
||||
val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
|
||||
|
|
Loading…
Reference in a new issue