SPARK-1183. Don't use "worker" to mean executor

Author: Sandy Ryza <sandy@cloudera.com>

Closes #120 from sryza/sandy-spark-1183 and squashes the following commits:

5066a4a [Sandy Ryza] Remove "worker" in a couple comments
0bd1e46 [Sandy Ryza] Remove --am-class from usage
bfc8fe0 [Sandy Ryza] Remove am-class from doc and fix yarn-alpha
607539f [Sandy Ryza] Address review comments
74d087a [Sandy Ryza] SPARK-1183. Don't use "worker" to mean executor
This commit is contained in:
Sandy Ryza 2014-03-13 12:11:33 -07:00 committed by Patrick Wendell
parent e4e8d8f395
commit 698373211e
21 changed files with 312 additions and 294 deletions

View file

@ -13,7 +13,7 @@ object in your main program (called the _driver program_).
Specifically, to run on a cluster, the SparkContext can connect to several types of _cluster managers_
(either Spark's own standalone cluster manager or Mesos/YARN), which allocate resources across
applications. Once connected, Spark acquires *executors* on nodes in the cluster, which are
worker processes that run computations and store data for your application.
processes that run computations and store data for your application.
Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to
the executors. Finally, SparkContext sends *tasks* for the executors to run.

View file

@ -135,7 +135,7 @@ Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Chan
structure of the graph are accomplished by producing a new graph with the desired changes. Note
that substantial parts of the original graph (i.e., unaffected structure, attributes, and indicies)
are reused in the new graph reducing the cost of this inherently functional data-structure. The
graph is partitioned across the workers using a range of vertex-partitioning heuristics. As with
graph is partitioned across the executors using a range of vertex-partitioning heuristics. As with
RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.
Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the

View file

@ -39,8 +39,8 @@ Resource allocation can be configured as follows, based on the cluster type:
* **Mesos:** To use static partitioning on Mesos, set the `spark.mesos.coarse` configuration property to `true`,
and optionally set `spark.cores.max` to limit each application's resource share as in the standalone mode.
You should also set `spark.executor.memory` to control the executor memory.
* **YARN:** The `--num-workers` option to the Spark YARN client controls how many workers it will allocate
on the cluster, while `--worker-memory` and `--worker-cores` control the resources per worker.
* **YARN:** The `--num-executors` option to the Spark YARN client controls how many executors it will allocate
on the cluster, while `--executor-memory` and `--executor-cores` control the resources per executor.
A second option available on Mesos is _dynamic sharing_ of CPU cores. In this mode, each Spark application
still has a fixed and independent memory allocation (set by `spark.executor.memory`), but when the

View file

@ -77,8 +77,8 @@ between the two goals of small loss and small model complexity.
**Distributed Datasets.**
For all currently implemented optimization methods for classification, the data must be
distributed between the worker machines *by examples*. Every machine holds a consecutive block of
the `$n$` example/label pairs `$(\x_i,y_i)$`.
distributed between processes on the worker machines *by examples*. Machines hold consecutive
blocks of the `$n$` example/label pairs `$(\x_i,y_i)$`.
In other words, the input distributed dataset
([RDD](scala-programming-guide.html#resilient-distributed-datasets-rdds)) must be the set of
vectors `$\x_i\in\R^d$`.

View file

@ -43,9 +43,9 @@ def is_error(line):
errors = logData.filter(is_error)
{% endhighlight %}
PySpark will automatically ship these functions to workers, along with any objects that they reference.
Instances of classes will be serialized and shipped to workers by PySpark, but classes themselves cannot be automatically distributed to workers.
The [Standalone Use](#standalone-use) section describes how to ship code dependencies to workers.
PySpark will automatically ship these functions to executors, along with any objects that they reference.
Instances of classes will be serialized and shipped to executors by PySpark, but classes themselves cannot be automatically distributed to executors.
The [Standalone Use](#standalone-use) section describes how to ship code dependencies to executors.
In addition, PySpark fully supports interactive use---simply run `./bin/pyspark` to launch an interactive shell.

View file

@ -41,7 +41,7 @@ System Properties:
* `spark.yarn.submit.file.replication`, the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
* `spark.yarn.preserve.staging.files`, set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
* `spark.yarn.scheduler.heartbeat.interval-ms`, the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds.
* `spark.yarn.max.worker.failures`, the maximum number of executor failures before failing the application. Default is the number of executors requested times 2 with minimum of 3.
* `spark.yarn.max.executor.failures`, the maximum number of executor failures before failing the application. Default is the number of executors requested times 2 with minimum of 3.
# Launching Spark on YARN
@ -60,11 +60,10 @@ The command to launch the Spark application on the cluster is as follows:
--jar <YOUR_APP_JAR_FILE> \
--class <APP_MAIN_CLASS> \
--args <APP_MAIN_ARGUMENTS> \
--num-workers <NUMBER_OF_EXECUTORS> \
--master-class <ApplicationMaster_CLASS>
--master-memory <MEMORY_FOR_MASTER> \
--worker-memory <MEMORY_PER_EXECUTOR> \
--worker-cores <CORES_PER_EXECUTOR> \
--num-executors <NUMBER_OF_EXECUTOR_PROCESSES> \
--driver-memory <MEMORY_FOR_ApplicationMaster> \
--executor-memory <MEMORY_PER_EXECUTOR> \
--executor-cores <CORES_PER_EXECUTOR> \
--name <application_name> \
--queue <queue_name> \
--addJars <any_local_files_used_in_SparkContext.addJar> \
@ -85,10 +84,10 @@ For example:
--jar examples/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
--class org.apache.spark.examples.SparkPi \
--args yarn-cluster \
--num-workers 3 \
--master-memory 4g \
--worker-memory 2g \
--worker-cores 1
--num-executors 3 \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1
The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running. Refer to the "Viewing Logs" section below for how to see driver and executor logs.
@ -100,12 +99,12 @@ With yarn-client mode, the application will be launched locally, just like runni
Configuration in yarn-client mode:
In order to tune worker cores/number/memory etc., you need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
In order to tune executor cores/number/memory etc., you need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
* `SPARK_WORKER_INSTANCES`, Number of executors to start (Default: 2)
* `SPARK_WORKER_CORES`, Number of cores per executor (Default: 1).
* `SPARK_WORKER_MEMORY`, Memory per executor (e.g. 1000M, 2G) (Default: 1G)
* `SPARK_MASTER_MEMORY`, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
* `SPARK_EXECUTOR_INSTANCES`, Number of executors to start (Default: 2)
* `SPARK_EXECUTOR_CORES`, Number of cores per executor (Default: 1).
* `SPARK_EXECUTOR_MEMORY`, Memory per executor (e.g. 1000M, 2G) (Default: 1G)
* `SPARK_DRIVER_MEMORY`, Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
* `SPARK_YARN_APP_NAME`, The name of your application (Default: Spark)
* `SPARK_YARN_QUEUE`, The YARN queue to use for allocation requests (Default: 'default')
* `SPARK_YARN_DIST_FILES`, Comma separated list of files to be distributed with the job.

View file

@ -61,9 +61,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
// Default to numWorkers * 2, with minimum of 3
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3))
// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
private var registered = false
@ -96,7 +96,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// Call this to force generation of secret so it gets populated into the
// hadoop UGI. This has to happen before the startUserClass which does a
// doAs in order for the credentials to be passed on to the worker containers.
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
// Start the user's JAR
@ -115,7 +115,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
// Allocate all containers
allocateWorkers()
allocateExecutors()
// Wait for the user class to Finish
userThread.join()
@ -215,7 +215,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
t
}
// this need to happen before allocateWorkers
// this need to happen before allocateExecutors
private def waitForSparkContextInitialized() {
logInfo("Waiting for spark context initialization")
try {
@ -260,21 +260,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
private def allocateWorkers() {
private def allocateExecutors() {
try {
logInfo("Allocating " + args.numWorkers + " workers.")
logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
// Exists the loop if the user thread exits.
while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of worker failures reached")
"max number of executor failures reached")
}
yarnAllocator.allocateContainers(
math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
@ -283,7 +283,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All workers have launched.")
logInfo("All executors have launched.")
// Launch a progress reporter thread, else the app will get killed after expiration
// (def: 10mins) timeout.
@ -309,15 +309,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
while (userThread.isAlive) {
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of worker failures reached")
"max number of executor failures reached")
}
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingWorkerCount))
yarnAllocator.allocateContainers(missingWorkerCount)
format(missingExecutorCount))
yarnAllocator.allocateContainers(missingExecutorCount)
}
else sendProgress()
Thread.sleep(sleepTime)

View file

@ -34,7 +34,7 @@ import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
@ -89,7 +89,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
if (minimumMemory > 0) {
val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
if (numCore > 0) {
@ -102,7 +102,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
waitForSparkMaster()
// Allocate all containers
allocateWorkers()
allocateExecutors()
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
@ -199,7 +199,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
private def allocateWorkers() {
private def allocateExecutors() {
// Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
@ -208,16 +208,16 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
args, preferredNodeLocationData, sparkConf)
logInfo("Allocating " + args.numWorkers + " workers.")
logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) {
yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
Thread.sleep(100)
}
logInfo("All workers have launched.")
logInfo("All executors have launched.")
}
@ -228,10 +228,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val t = new Thread {
override def run() {
while (!driverClosed) {
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
yarnAllocator.allocateContainers(missingWorkerCount)
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers")
yarnAllocator.allocateContainers(missingExecutorCount)
}
else sendProgress()
Thread.sleep(sleepTime)
@ -264,9 +264,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
object WorkerLauncher {
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
new WorkerLauncher(args).run()
new ExecutorLauncher(args).run()
}
}

View file

@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
import org.apache.spark.{SparkConf, Logging}
class WorkerRunnable(
class ExecutorRunnable(
container: Container,
conf: Configuration,
spConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
workerMemory: Int,
workerCores: Int)
extends Runnable with WorkerRunnableUtil with Logging {
executorMemory: Int,
executorCores: Int)
extends Runnable with ExecutorRunnableUtil with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var cm: ContainerManager = _
@ -55,7 +55,7 @@ class WorkerRunnable(
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {
logInfo("Starting Worker Container")
logInfo("Starting Executor Container")
cm = connectToCM
startContainer
}
@ -81,8 +81,8 @@ class WorkerRunnable(
credentials.writeTokenStorageToStream(dob)
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
logInfo("Setting up worker with commands: " + commands)
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores)
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
// Send the start request to the ContainerManager

View file

@ -58,9 +58,9 @@ private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val resourceManager: AMRMProtocol,
val appAttemptId: ApplicationAttemptId,
val maxWorkers: Int,
val workerMemory: Int,
val workerCores: Int,
val maxExecutors: Int,
val executorMemory: Int,
val executorCores: Int,
val preferredHostToCount: Map[String, Int],
val preferredRackToCount: Map[String, Int],
val sparkConf: SparkConf)
@ -84,39 +84,39 @@ private[yarn] class YarnAllocationHandler(
// Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
private val numWorkersRunning = new AtomicInteger()
// Used to generate a unique id per worker
private val workerIdCounter = new AtomicInteger()
private val numExecutorsRunning = new AtomicInteger()
// Used to generate a unique id per executor
private val executorIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
private val numWorkersFailed = new AtomicInteger()
private val numExecutorsFailed = new AtomicInteger()
def getNumWorkersRunning: Int = numWorkersRunning.intValue
def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
def getNumWorkersFailed: Int = numWorkersFailed.intValue
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
}
def allocateContainers(workersToRequest: Int) {
def allocateContainers(executorsToRequest: Int) {
// We need to send the request only once from what I understand ... but for now, not modifying
// this much.
// Keep polling the Resource Manager for containers
val amResp = allocateWorkerResources(workersToRequest).getAMResponse
val amResp = allocateExecutorResources(executorsToRequest).getAMResponse
val _allocatedContainers = amResp.getAllocatedContainers()
if (_allocatedContainers.size > 0) {
logDebug("""
Allocated containers: %d
Current worker count: %d
Current executor count: %d
Containers released: %s
Containers to be released: %s
Cluster resources: %s
""".format(
_allocatedContainers.size,
numWorkersRunning.get(),
numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers,
amResp.getAvailableResources))
@ -221,59 +221,59 @@ private[yarn] class YarnAllocationHandler(
// Run each of the allocated containers
for (container <- allocatedContainers) {
val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
val workerHostname = container.getNodeId.getHost
val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
assert(
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
if (numWorkersRunningNow > maxWorkers) {
if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
containers for it.""".format(containerId, workerHostname))
containers for it.""".format(containerId, executorHostname))
releasedContainerList.add(containerId)
// reset counter back to old value.
numWorkersRunning.decrementAndGet()
numExecutorsRunning.decrementAndGet()
}
else {
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
// (workerIdCounter)
val workerId = workerIdCounter.incrementAndGet().toString
// (executorIdCounter)
val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("launching container on " + containerId + " host " + workerHostname)
logInfo("launching container on " + containerId + " host " + executorHostname)
// Just to be safe, simply remove it from pendingReleaseContainers.
// Should not be there, but ..
pendingReleaseContainers.remove(containerId)
val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
containerSet += containerId
allocatedContainerToHostMap.put(containerId, workerHostname)
allocatedContainerToHostMap.put(containerId, executorHostname)
if (rack != null) {
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
new Thread(
new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId,
workerHostname, workerMemory, workerCores)
new ExecutorRunnable(container, conf, sparkConf, driverUrl, executorId,
executorHostname, executorMemory, executorCores)
).start()
}
}
logDebug("""
Finished processing %d containers.
Current number of workers running: %d,
Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
allocatedContainers.size,
numWorkersRunning.get(),
numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@ -292,7 +292,7 @@ private[yarn] class YarnAllocationHandler(
}
else {
// Simply decrement count - next iteration of ReporterThread will take care of allocating.
numWorkersRunning.decrementAndGet()
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,
completedContainer.getState,
@ -302,7 +302,7 @@ private[yarn] class YarnAllocationHandler(
// now I think its ok as none of the containers are expected to exit
if (completedContainer.getExitStatus() != 0) {
logInfo("Container marked as failed: " + containerId)
numWorkersFailed.incrementAndGet()
numExecutorsFailed.incrementAndGet()
}
}
@ -332,12 +332,12 @@ private[yarn] class YarnAllocationHandler(
}
logDebug("""
Finished processing %d completed containers.
Current number of workers running: %d,
Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
completedContainers.size,
numWorkersRunning.get(),
numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@ -387,18 +387,18 @@ private[yarn] class YarnAllocationHandler(
retval
}
private def allocateWorkerResources(numWorkers: Int): AllocateResponse = {
private def allocateExecutorResources(numExecutors: Int): AllocateResponse = {
var resourceRequests: List[ResourceRequest] = null
// default.
if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty)
resourceRequests = List(
createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
}
else {
// request for all hosts in preferred nodes and for numWorkers -
// request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
val hostContainerRequests: ArrayBuffer[ResourceRequest] =
new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
@ -419,7 +419,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests: ResourceRequest = createResourceRequest(
AllocationType.ANY,
resource = null,
numWorkers,
numExecutors,
YarnAllocationHandler.PRIORITY)
val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
@ -441,9 +441,9 @@ private[yarn] class YarnAllocationHandler(
val releasedContainerList = createReleasedContainerList()
req.addAllReleases(releasedContainerList)
if (numWorkers > 0) {
logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
if (numExecutors > 0) {
logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
}
else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
@ -464,7 +464,7 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequest(
requestType: AllocationType.AllocationType,
resource:String,
numWorkers: Int,
numExecutors: Int,
priority: Int): ResourceRequest = {
// If hostname specified, we need atleast two requests - node local and rack local.
@ -473,7 +473,7 @@ private[yarn] class YarnAllocationHandler(
case AllocationType.HOST => {
assert(YarnAllocationHandler.ANY_HOST != resource)
val hostname = resource
val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority)
// Add to host->rack mapping
YarnAllocationHandler.populateRackInfo(conf, hostname)
@ -482,10 +482,10 @@ private[yarn] class YarnAllocationHandler(
}
case AllocationType.RACK => {
val rack = resource
createResourceRequestImpl(rack, numWorkers, priority)
createResourceRequestImpl(rack, numExecutors, priority)
}
case AllocationType.ANY => createResourceRequestImpl(
YarnAllocationHandler.ANY_HOST, numWorkers, priority)
YarnAllocationHandler.ANY_HOST, numExecutors, priority)
case _ => throw new IllegalArgumentException(
"Unexpected/unsupported request type: " + requestType)
}
@ -493,13 +493,13 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequestImpl(
hostname:String,
numWorkers: Int,
numExecutors: Int,
priority: Int): ResourceRequest = {
val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
val memCapability = Records.newRecord(classOf[Resource])
// There probably is some overhead here, let's reserve a bit more memory.
memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
rsrcRequest.setCapability(memCapability)
val pri = Records.newRecord(classOf[Priority])
@ -508,7 +508,7 @@ private[yarn] class YarnAllocationHandler(
rsrcRequest.setHostName(hostname)
rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0))
rsrcRequest.setNumContainers(java.lang.Math.max(numExecutors, 0))
rsrcRequest
}
@ -560,9 +560,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
args.numWorkers,
args.workerMemory,
args.workerCores,
args.numExecutors,
args.executorMemory,
args.executorCores,
Map[String, Int](),
Map[String, Int](),
sparkConf)
@ -582,9 +582,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
args.numWorkers,
args.workerMemory,
args.workerCores,
args.numExecutors,
args.executorMemory,
args.executorCores,
hostToCount,
rackToCount,
sparkConf)
@ -594,9 +594,9 @@ object YarnAllocationHandler {
conf: Configuration,
resourceManager: AMRMProtocol,
appAttemptId: ApplicationAttemptId,
maxWorkers: Int,
workerMemory: Int,
workerCores: Int,
maxExecutors: Int,
executorMemory: Int,
executorCores: Int,
map: collection.Map[String, collection.Set[SplitInfo]],
sparkConf: SparkConf): YarnAllocationHandler = {
@ -606,9 +606,9 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
maxWorkers,
workerMemory,
workerCores,
maxExecutors,
executorMemory,
executorCores,
hostToCount,
rackToCount,
sparkConf)

View file

@ -24,9 +24,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
var workerMemory = 1024
var workerCores = 1
var numWorkers = 2
var executorMemory = 1024
var executorCores = 1
var numExecutors = 2
parseArgs(args.toList)
@ -36,7 +36,8 @@ class ApplicationMasterArguments(val args: Array[String]) {
var args = inputArgs
while (! args.isEmpty) {
// --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
// the properties with executor in their names are preferred.
args match {
case ("--jar") :: value :: tail =>
userJar = value
@ -50,16 +51,16 @@ class ApplicationMasterArguments(val args: Array[String]) {
userArgsBuffer += value
args = tail
case ("--num-workers") :: IntParam(value) :: tail =>
numWorkers = value
case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
numExecutors = value
args = tail
case ("--worker-memory") :: IntParam(value) :: tail =>
workerMemory = value
case ("--worker-memory" | "--executor-memory") :: IntParam(value) :: tail =>
executorMemory = value
args = tail
case ("--worker-cores") :: IntParam(value) :: tail =>
workerCores = value
case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
executorCores = value
args = tail
case Nil =>
@ -86,9 +87,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" +
" --worker-cores NUM Number of cores for the workers (Default: 1)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
" --num-executors NUM Number of executors to start (Default: 2)\n" +
" --executor-cores NUM Number of cores for the executors (Default: 1)\n" +
" --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n")
System.exit(exitCode)
}
}

View file

@ -33,9 +33,9 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
var workerMemory = 1024 // MB
var workerCores = 1
var numWorkers = 2
var executorMemory = 1024 // MB
var executorCores = 1
var numExecutors = 2
var amQueue = sparkConf.get("QUEUE", "default")
var amMemory: Int = 512 // MB
var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
@ -67,24 +67,39 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
userArgsBuffer += value
args = tail
case ("--master-class") :: value :: tail =>
case ("--master-class" | "--am-class") :: value :: tail =>
if (args(0) == "--master-class") {
println("--master-class is deprecated. Use --am-class instead.")
}
amClass = value
args = tail
case ("--master-memory") :: MemoryParam(value) :: tail =>
case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
if (args(0) == "--master-memory") {
println("--master-memory is deprecated. Use --driver-memory instead.")
}
amMemory = value
args = tail
case ("--num-workers") :: IntParam(value) :: tail =>
numWorkers = value
case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
if (args(0) == "--num-workers") {
println("--num-workers is deprecated. Use --num-executors instead.")
}
numExecutors = value
args = tail
case ("--worker-memory") :: MemoryParam(value) :: tail =>
workerMemory = value
case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
if (args(0) == "--worker-memory") {
println("--worker-memory is deprecated. Use --executor-memory instead.")
}
executorMemory = value
args = tail
case ("--worker-cores") :: IntParam(value) :: tail =>
workerCores = value
case ("--worker-cores" | "--executor-memory") :: IntParam(value) :: tail =>
if (args(0) == "--worker-cores") {
println("--worker-cores is deprecated. Use --executor-cores instead.")
}
executorCores = value
args = tail
case ("--queue") :: value :: tail =>
@ -133,11 +148,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --args ARGS Arguments to be passed to your application's main class.\n" +
" Mutliple invocations are possible, each will be passed in order.\n" +
" --num-workers NUM Number of workers to start (Default: 2)\n" +
" --worker-cores NUM Number of cores for the workers (Default: 1).\n" +
" --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
" --num-executors NUM Number of executors to start (Default: 2)\n" +
" --executor-cores NUM Number of cores for the executors (Default: 1).\n" +
" --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n" +
" --name NAME The name of your application (Default: Spark)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +

View file

@ -73,10 +73,10 @@ trait ClientBase extends Logging {
((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
"Error: You must specify a user jar when running in standalone mode!"),
(args.userClass == null) -> "Error: You must specify a user class!",
(args.numWorkers <= 0) -> "Error: You must specify at least 1 worker!",
(args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
"greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
(args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size" +
(args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" +
"must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString)
).foreach { case(cond, errStr) =>
if (cond) {
@ -95,9 +95,9 @@ trait ClientBase extends Logging {
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
// If we have requested more then the clusters max for a single resource then exit.
if (args.workerMemory > maxMem) {
logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.".
format(args.workerMemory, maxMem))
if (args.executorMemory > maxMem) {
logError("Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster.".
format(args.executorMemory, maxMem))
System.exit(1)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
@ -276,7 +276,7 @@ trait ClientBase extends Logging {
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
// Set the environment variables to be passed on to the Workers.
// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(env)
distCacheMgr.setDistArchivesEnv(env)
@ -360,9 +360,9 @@ trait ClientBase extends Logging {
" --class " + args.userClass +
" --jar " + args.userJar +
userArgsToString(args) +
" --worker-memory " + args.workerMemory +
" --worker-cores " + args.workerCores +
" --num-workers " + args.numWorkers +
" --executor-memory " + args.executorMemory +
" --executor-cores " + args.executorCores +
" --num-executors " + args.numExecutors +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

View file

@ -46,10 +46,10 @@ class ClientDistributedCacheManager() extends Logging {
/**
* Add a resource to the list of distributed cache resources. This list can
* be sent to the ApplicationMaster and possibly the workers so that it can
* be sent to the ApplicationMaster and possibly the executors so that it can
* be downloaded into the Hadoop distributed cache for use by this application.
* Adds the LocalResource to the localResources HashMap passed in and saves
* the stats of the resources to they can be sent to the workers and verified.
* the stats of the resources to they can be sent to the executors and verified.
*
* @param fs FileSystem
* @param conf Configuration

View file

@ -39,7 +39,7 @@ import org.apache.spark.{SparkConf, Logging}
import org.apache.hadoop.yarn.conf.YarnConfiguration
trait WorkerRunnableUtil extends Logging {
trait ExecutorRunnableUtil extends Logging {
val yarnConf: YarnConfiguration
val sparkConf: SparkConf
@ -49,13 +49,13 @@ trait WorkerRunnableUtil extends Logging {
masterAddress: String,
slaveId: String,
hostname: String,
workerMemory: Int,
workerCores: Int) = {
executorMemory: Int,
executorCores: Int) = {
// Extra options for the JVM
var JAVA_OPTS = ""
// Set the JVM memory
val workerMemoryString = workerMemory + "m"
JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
val executorMemoryString = executorMemory + "m"
JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
@ -97,7 +97,7 @@ trait WorkerRunnableUtil extends Logging {
val commands = List[String](javaCommand +
" -server " +
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
// Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
// an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
@ -107,7 +107,7 @@ trait WorkerRunnableUtil extends Logging {
masterAddress + " " +
slaveId + " " +
hostname + " " +
workerCores +
executorCores +
" 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

View file

@ -25,7 +25,7 @@ import org.apache.spark.util.Utils
/**
*
* This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
* This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM.
*/
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
@ -40,7 +40,7 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
override def postStartHook() {
// The yarn application is running, but the worker might not yet ready
// The yarn application is running, but the executor might not yet ready
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
Thread.sleep(2000L)
logInfo("YarnClientClusterScheduler.postStartHook done")

View file

@ -53,20 +53,24 @@ private[spark] class YarnClientSchedulerBackend(
"--class", "notused",
"--jar", null,
"--args", hostport,
"--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
"--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
)
// process any optional arguments, use the defaults already defined in ClientArguments
// if things aren't specified
Map("--master-memory" -> "SPARK_MASTER_MEMORY",
"--num-workers" -> "SPARK_WORKER_INSTANCES",
"--worker-memory" -> "SPARK_WORKER_MEMORY",
"--worker-cores" -> "SPARK_WORKER_CORES",
"--queue" -> "SPARK_YARN_QUEUE",
"--name" -> "SPARK_YARN_APP_NAME",
"--files" -> "SPARK_YARN_DIST_FILES",
"--archives" -> "SPARK_YARN_DIST_ARCHIVES")
.foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) }
Map("SPARK_MASTER_MEMORY" -> "--driver-memory",
"SPARK_DRIVER_MEMORY" -> "--driver-memory",
"SPARK_WORKER_INSTANCES" -> "--num-executors",
"SPARK_WORKER_MEMORY" -> "--executor-memory",
"SPARK_WORKER_CORES" -> "--executor-cores",
"SPARK_EXECUTOR_INSTANCES" -> "--num-executors",
"SPARK_EXECUTOR_MEMORY" -> "--executor-memory",
"SPARK_EXECUTOR_CORES" -> "--executor-cores",
"SPARK_YARN_QUEUE" -> "--queue",
"SPARK_YARN_APP_NAME" -> "--name",
"SPARK_YARN_DIST_FILES" -> "--files",
"SPARK_YARN_DIST_ARCHIVES" -> "--archives")
.foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) }
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
@ -77,7 +81,7 @@ private[spark] class YarnClientSchedulerBackend(
def waitForApp() {
// TODO : need a better way to find out whether the workers are ready or not
// TODO : need a better way to find out whether the executors are ready or not
// maybe by resource usage report?
while(true) {
val report = client.getApplicationReport(appId)

View file

@ -64,9 +64,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var isLastAMRetry: Boolean = true
private var amClient: AMRMClient[ContainerRequest] = _
// Default to numWorkers * 2, with minimum of 3
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(args.numWorkers * 2, 3))
// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
private var registered = false
@ -101,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// Call this to force generation of secret so it gets populated into the
// hadoop UGI. This has to happen before the startUserClass which does a
// doAs in order for the credentials to be passed on to the worker containers.
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
// Start the user's JAR
@ -120,7 +120,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
// Allocate all containers
allocateWorkers()
allocateExecutors()
// Wait for the user class to Finish
userThread.join()
@ -202,7 +202,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
t
}
// This need to happen before allocateWorkers()
// This need to happen before allocateExecutors()
private def waitForSparkContextInitialized() {
logInfo("Waiting for Spark context initialization")
try {
@ -247,18 +247,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
private def allocateWorkers() {
private def allocateExecutors() {
try {
logInfo("Allocating " + args.numWorkers + " workers.")
logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
yarnAllocator.addResourceRequests(args.numWorkers)
yarnAllocator.addResourceRequests(args.numExecutors)
// Exits the loop if the user thread exits.
while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of worker failures reached")
"max number of executor failures reached")
}
yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
@ -269,7 +269,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All workers have launched.")
logInfo("All executors have launched.")
// Launch a progress reporter thread, else the app will get killed after expiration
// (def: 10mins) timeout.
@ -294,16 +294,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
while (userThread.isAlive) {
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of worker failures reached")
"max number of executor failures reached")
}
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingWorkerCount > 0) {
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingWorkerCount))
yarnAllocator.addResourceRequests(missingWorkerCount)
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
sendProgress()
Thread.sleep(sleepTime)

View file

@ -35,7 +35,7 @@ import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
@ -93,7 +93,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
waitForSparkMaster()
// Allocate all containers
allocateWorkers()
allocateExecutors()
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
@ -175,7 +175,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
private def allocateWorkers() {
private def allocateExecutors() {
// Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
@ -189,18 +189,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
preferredNodeLocationData,
sparkConf)
logInfo("Allocating " + args.numWorkers + " workers.")
logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
yarnAllocator.addResourceRequests(args.numWorkers)
while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) {
yarnAllocator.addResourceRequests(args.numExecutors)
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateResources()
Thread.sleep(100)
}
logInfo("All workers have launched.")
logInfo("All executors have launched.")
}
@ -211,12 +211,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val t = new Thread {
override def run() {
while (!driverClosed) {
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingWorkerCount > 0) {
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingWorkerCount))
yarnAllocator.addResourceRequests(missingWorkerCount)
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
sendProgress()
Thread.sleep(sleepTime)
@ -244,9 +244,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
}
object WorkerLauncher {
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
new WorkerLauncher(args).run()
new ExecutorLauncher(args).run()
}
}

View file

@ -38,16 +38,16 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
import org.apache.spark.{SparkConf, Logging}
class WorkerRunnable(
class ExecutorRunnable(
container: Container,
conf: Configuration,
spConf: SparkConf,
masterAddress: String,
slaveId: String,
hostname: String,
workerMemory: Int,
workerCores: Int)
extends Runnable with WorkerRunnableUtil with Logging {
executorMemory: Int,
executorCores: Int)
extends Runnable with ExecutorRunnableUtil with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
@ -55,7 +55,7 @@ class WorkerRunnable(
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
def run = {
logInfo("Starting Worker Container")
logInfo("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(yarnConf)
nmClient.start()
@ -78,9 +78,9 @@ class WorkerRunnable(
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores)
logInfo("Setting up worker with commands: " + commands)
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
// Send the start request to the ContainerManager

View file

@ -60,9 +60,9 @@ private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val amClient: AMRMClient[ContainerRequest],
val appAttemptId: ApplicationAttemptId,
val maxWorkers: Int,
val workerMemory: Int,
val workerCores: Int,
val maxExecutors: Int,
val executorMemory: Int,
val executorCores: Int,
val preferredHostToCount: Map[String, Int],
val preferredRackToCount: Map[String, Int],
val sparkConf: SparkConf)
@ -89,20 +89,20 @@ private[yarn] class YarnAllocationHandler(
// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
private val numPendingAllocate = new AtomicInteger()
private val numWorkersRunning = new AtomicInteger()
// Used to generate a unique id per worker
private val workerIdCounter = new AtomicInteger()
private val numExecutorsRunning = new AtomicInteger()
// Used to generate a unique id per executor
private val executorIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
private val numWorkersFailed = new AtomicInteger()
private val numExecutorsFailed = new AtomicInteger()
def getNumPendingAllocate: Int = numPendingAllocate.intValue
def getNumWorkersRunning: Int = numWorkersRunning.intValue
def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
def getNumWorkersFailed: Int = numWorkersFailed.intValue
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
}
def releaseContainer(container: Container) {
@ -127,13 +127,13 @@ private[yarn] class YarnAllocationHandler(
logDebug("""
Allocated containers: %d
Current worker count: %d
Current executor count: %d
Containers released: %s
Containers to-be-released: %s
Cluster resources: %s
""".format(
allocatedContainers.size,
numWorkersRunning.get(),
numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers,
allocateResponse.getAvailableResources))
@ -240,64 +240,64 @@ private[yarn] class YarnAllocationHandler(
// Run each of the allocated containers.
for (container <- allocatedContainersToProcess) {
val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
val workerHostname = container.getNodeId.getHost
val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val workerMemoryOverhead = (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
assert(container.getResource.getMemory >= workerMemoryOverhead)
val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
assert(container.getResource.getMemory >= executorMemoryOverhead)
if (numWorkersRunningNow > maxWorkers) {
if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
containers for it.""".format(containerId, workerHostname))
containers for it.""".format(containerId, executorHostname))
releaseContainer(container)
numWorkersRunning.decrementAndGet()
numExecutorsRunning.decrementAndGet()
} else {
val workerId = workerIdCounter.incrementAndGet().toString
val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
logInfo("Launching container %s for on host %s".format(containerId, workerHostname))
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
// To be safe, remove the container from `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
containerSet += containerId
allocatedContainerToHostMap.put(containerId, workerHostname)
allocatedContainerToHostMap.put(containerId, executorHostname)
if (rack != null) {
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
logInfo("Launching WorkerRunnable. driverUrl: %s, workerHostname: %s".format(driverUrl, workerHostname))
val workerRunnable = new WorkerRunnable(
logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname))
val executorRunnable = new ExecutorRunnable(
container,
conf,
sparkConf,
driverUrl,
workerId,
workerHostname,
workerMemory,
workerCores)
new Thread(workerRunnable).start()
executorId,
executorHostname,
executorMemory,
executorCores)
new Thread(executorRunnable).start()
}
}
logDebug("""
Finished allocating %s containers (from %s originally).
Current number of workers running: %d,
Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
allocatedContainersToProcess,
allocatedContainers,
numWorkersRunning.get(),
numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@ -314,9 +314,9 @@ private[yarn] class YarnAllocationHandler(
// `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
} else {
// Decrement the number of workers running. The next iteration of the ApplicationMaster's
// Decrement the number of executors running. The next iteration of the ApplicationMaster's
// reporting thread will take care of allocating.
numWorkersRunning.decrementAndGet()
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,
completedContainer.getState,
@ -326,7 +326,7 @@ private[yarn] class YarnAllocationHandler(
// now I think its ok as none of the containers are expected to exit
if (completedContainer.getExitStatus() != 0) {
logInfo("Container marked as failed: " + containerId)
numWorkersFailed.incrementAndGet()
numExecutorsFailed.incrementAndGet()
}
}
@ -364,12 +364,12 @@ private[yarn] class YarnAllocationHandler(
}
logDebug("""
Finished processing %d completed containers.
Current number of workers running: %d,
Current number of executors running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
completedContainers.size,
numWorkersRunning.get(),
numExecutorsRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
@ -421,18 +421,18 @@ private[yarn] class YarnAllocationHandler(
retval
}
def addResourceRequests(numWorkers: Int) {
def addResourceRequests(numExecutors: Int) {
val containerRequests: List[ContainerRequest] =
if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
logDebug("numWorkers: " + numWorkers + ", host preferences: " +
if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
logDebug("numExecutors: " + numExecutors + ", host preferences: " +
preferredHostToCount.isEmpty)
createResourceRequests(
AllocationType.ANY,
resource = null,
numWorkers,
numExecutors,
YarnAllocationHandler.PRIORITY).toList
} else {
// Request for all hosts in preferred nodes and for numWorkers -
// Request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
for ((candidateHost, candidateCount) <- preferredHostToCount) {
@ -452,7 +452,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests = createResourceRequests(
AllocationType.ANY,
resource = null,
numWorkers,
numExecutors,
YarnAllocationHandler.PRIORITY)
val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
@ -468,11 +468,11 @@ private[yarn] class YarnAllocationHandler(
amClient.addContainerRequest(request)
}
if (numWorkers > 0) {
numPendingAllocate.addAndGet(numWorkers)
logInfo("Will Allocate %d worker containers, each with %d memory".format(
numWorkers,
(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
if (numExecutors > 0) {
numPendingAllocate.addAndGet(numExecutors)
logInfo("Will Allocate %d executor containers, each with %d memory".format(
numExecutors,
(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
} else {
logDebug("Empty allocation request ...")
}
@ -494,7 +494,7 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequests(
requestType: AllocationType.AllocationType,
resource: String,
numWorkers: Int,
numExecutors: Int,
priority: Int
): ArrayBuffer[ContainerRequest] = {
@ -507,7 +507,7 @@ private[yarn] class YarnAllocationHandler(
val nodeLocal = constructContainerRequests(
Array(hostname),
racks = null,
numWorkers,
numExecutors,
priority)
// Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
@ -516,10 +516,10 @@ private[yarn] class YarnAllocationHandler(
}
case AllocationType.RACK => {
val rack = resource
constructContainerRequests(hosts = null, Array(rack), numWorkers, priority)
constructContainerRequests(hosts = null, Array(rack), numExecutors, priority)
}
case AllocationType.ANY => constructContainerRequests(
hosts = null, racks = null, numWorkers, priority)
hosts = null, racks = null, numExecutors, priority)
case _ => throw new IllegalArgumentException(
"Unexpected/unsupported request type: " + requestType)
}
@ -528,18 +528,18 @@ private[yarn] class YarnAllocationHandler(
private def constructContainerRequests(
hosts: Array[String],
racks: Array[String],
numWorkers: Int,
numExecutors: Int,
priority: Int
): ArrayBuffer[ContainerRequest] = {
val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val resource = Resource.newInstance(memoryRequest, workerCores)
val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val resource = Resource.newInstance(memoryRequest, executorCores)
val prioritySetting = Records.newRecord(classOf[Priority])
prioritySetting.setPriority(priority)
val requests = new ArrayBuffer[ContainerRequest]()
for (i <- 0 until numWorkers) {
for (i <- 0 until numExecutors) {
requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
}
requests
@ -574,9 +574,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
args.numWorkers,
args.workerMemory,
args.workerCores,
args.numExecutors,
args.executorMemory,
args.executorCores,
Map[String, Int](),
Map[String, Int](),
sparkConf)
@ -596,9 +596,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
args.numWorkers,
args.workerMemory,
args.workerCores,
args.numExecutors,
args.executorMemory,
args.executorCores,
hostToSplitCount,
rackToSplitCount,
sparkConf)
@ -608,9 +608,9 @@ object YarnAllocationHandler {
conf: Configuration,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
maxWorkers: Int,
workerMemory: Int,
workerCores: Int,
maxExecutors: Int,
executorMemory: Int,
executorCores: Int,
map: collection.Map[String, collection.Set[SplitInfo]],
sparkConf: SparkConf
): YarnAllocationHandler = {
@ -619,9 +619,9 @@ object YarnAllocationHandler {
conf,
amClient,
appAttemptId,
maxWorkers,
workerMemory,
workerCores,
maxExecutors,
executorMemory,
executorCores,
hostToCount,
rackToCount,
sparkConf)