[SPARK-4751] Dynamic allocation in standalone mode

Dynamic allocation is a feature that allows a Spark application to scale the number of executors up and down dynamically based on the workload. Support was first introduced in YARN since 1.2, and then extended to Mesos coarse-grained mode recently. Today, it is finally supported in standalone mode as well!

I tested this locally and it works as expected. This is WIP because unit tests are coming.

Author: Andrew Or <andrew@databricks.com>

Closes #7532 from andrewor14/standalone-da and squashes the following commits:

b3c1736 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
879e928 [Andrew Or] Add end-to-end tests for standalone dynamic allocation
accc8f6 [Andrew Or] Address comments
ee686a8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
c0a2c02 [Andrew Or] Fix build after merge conflict
24149eb [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
2e762d6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
6832bd7 [Andrew Or] Add tests for scheduling with executor limit
a82e907 [Andrew Or] Fix comments
0a8be79 [Andrew Or] Simplify logic by removing the worker blacklist
b7742af [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
2eb5f3f [Andrew Or] Merge branch 'master' of github.com:apache/spark into standalone-da
1334e9a [Andrew Or] Fix MiMa
32abe44 [Andrew Or] Fix style
58cb06f [Andrew Or] Privatize worker blacklist for cleanliness
42ac215 [Andrew Or] Clean up comments and rewrite code for readability
49702d1 [Andrew Or] Clean up shuffle files after application exits
80047aa [Andrew Or] First working implementation
This commit is contained in:
Andrew Or 2015-08-01 11:57:14 -07:00
parent c5166f7a69
commit 6688ba6e68
13 changed files with 759 additions and 176 deletions

View file

@ -177,16 +177,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
s"timed out after ${now - lastSeenMs} ms"))
if (sc.supportDynamicAllocation) {
// Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
// Note: we want to get an executor back after expiring this one,
// so do not simply call `sc.killExecutor` here (SPARK-8119)
sc.killAndReplaceExecutor(executorId)
}
})
}
killExecutorThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
// Note: we want to get an executor back after expiring this one,
// so do not simply call `sc.killExecutor` here (SPARK-8119)
sc.killAndReplaceExecutor(executorId)
}
})
executorLastSeen.remove(executorId)
}
}

View file

@ -531,8 +531,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
assert(supportDynamicAllocation,
"Dynamic allocation of executors is currently only supported in YARN and Mesos mode")
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
None
@ -1361,17 +1359,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
postEnvironmentUpdate()
}
/**
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN
* and Mesos coarse-grained mode.
*/
private[spark] def supportDynamicAllocation: Boolean = {
(master.contains("yarn")
|| master.contains("mesos")
|| _conf.getBoolean("spark.dynamicAllocation.testing", false))
}
/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
@ -1400,8 +1387,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
localityAwareTasks: Int,
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
): Boolean = {
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
@ -1414,12 +1399,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
* Request an additional number of executors from the cluster manager.
* This is currently only supported in YARN mode. Return whether the request is received.
* @return whether the request is received.
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
@ -1438,12 +1421,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* through this method with new ones, it should follow up explicitly with a call to
* {{SparkContext#requestExecutors}}.
*
* This is currently only supported in YARN mode. Return whether the request is received.
* @return whether the request is received.
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
assert(supportDynamicAllocation,
"Killing executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
@ -1462,7 +1443,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* through this method with a new one, it should follow up explicitly with a call to
* {{SparkContext#requestExecutors}}.
*
* This is currently only supported in YARN mode. Return whether the request is received.
* @return whether the request is received.
*/
@DeveloperApi
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
@ -1479,7 +1460,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* can steal the window of opportunity and acquire this application's resources in the
* mean time.
*
* This is currently only supported in YARN mode. Return whether the request is received.
* @return whether the request is received.
*/
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {

View file

@ -107,6 +107,10 @@ private[deploy] object DeployMessages {
case class MasterChangeAcknowledged(appId: String)
case class RequestExecutors(appId: String, requestedTotal: Int)
case class KillExecutors(appId: String, executorIds: Seq[String])
// Master to AppClient
case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage

View file

@ -70,6 +70,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
server = transportContext.createServer(port, bootstraps)
}
/** Clean up all shuffle files associated with an application that has exited. */
def applicationRemoved(appId: String): Unit = {
blockHandler.applicationRemoved(appId, true /* cleanupLocalDirs */)
}
def stop() {
if (server != null) {
server.close()

View file

@ -197,6 +197,22 @@ private[spark] class AppClient(
sendToMaster(UnregisterApplication(appId))
context.reply(true)
stop()
case r: RequestExecutors =>
master match {
case Some(m) => context.reply(m.askWithRetry[Boolean](r))
case None =>
logWarning("Attempted to request executors before registering with Master.")
context.reply(false)
}
case k: KillExecutors =>
master match {
case Some(m) => context.reply(m.askWithRetry[Boolean](k))
case None =>
logWarning("Attempted to kill executors before registering with Master.")
context.reply(false)
}
}
override def onDisconnected(address: RpcAddress): Unit = {
@ -257,4 +273,33 @@ private[spark] class AppClient(
endpoint = null
}
}
/**
* Request executors from the Master by specifying the total number desired,
* including existing pending and running executors.
*
* @return whether the request is acknowledged.
*/
def requestTotalExecutors(requestedTotal: Int): Boolean = {
if (endpoint != null && appId != null) {
endpoint.askWithRetry[Boolean](RequestExecutors(appId, requestedTotal))
} else {
logWarning("Attempted to request executors before driver fully initialized.")
false
}
}
/**
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
*/
def killExecutors(executorIds: Seq[String]): Boolean = {
if (endpoint != null && appId != null) {
endpoint.askWithRetry[Boolean](KillExecutors(appId, executorIds))
} else {
logWarning("Attempted to kill executors before driver fully initialized.")
false
}
}
}

View file

@ -22,7 +22,6 @@ import java.util.Date
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
@ -43,6 +42,11 @@ private[spark] class ApplicationInfo(
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
// A cap on the number of executors this application can have at any given time.
// By default, this is infinite. Only after the first allocation request is issued by the
// application will this be set to a finite value. This is used for dynamic allocation.
@transient private[master] var executorLimit: Int = _
@transient private var nextExecutorId: Int = _
init()
@ -60,6 +64,7 @@ private[spark] class ApplicationInfo(
appSource = new ApplicationSource(this)
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc]
executorLimit = Integer.MAX_VALUE
}
private def newExecutorId(useID: Option[Int] = None): Int = {
@ -116,6 +121,12 @@ private[spark] class ApplicationInfo(
state != ApplicationState.WAITING && state != ApplicationState.RUNNING
}
/**
* Return the limit on the number of executors this application can have.
* For testing only.
*/
private[deploy] def getExecutorLimit: Int = executorLimit
def duration: Long = {
if (endTime != -1) {
endTime - startTime

View file

@ -45,7 +45,7 @@ import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
private[master] class Master(
private[deploy] class Master(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
@ -468,6 +468,13 @@ private[master] class Master(
case BoundPortsRequest => {
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
}
case RequestExecutors(appId, requestedTotal) =>
context.reply(handleRequestExecutors(appId, requestedTotal))
case KillExecutors(appId, executorIds) =>
val formattedExecutorIds = formatExecutorIds(executorIds)
context.reply(handleKillExecutors(appId, formattedExecutorIds))
}
override def onDisconnected(address: RpcAddress): Unit = {
@ -563,32 +570,49 @@ private[master] class Master(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
// If the number of cores per executor is not specified, then we can just schedule
// 1 core at a time since we expect a single executor to be launched on each worker
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
val coresPerExecutor = app.desc.coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
val oneExecutorPerWorker = coresPerExecutor.isEmpty
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var freeWorkers = (0 until numUsable).toIndexedSeq
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, we may have already started assigning cores to the executor on this worker.
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
val underLimit =
if (launchingNewExecutor) {
assignedExecutors.sum + app.executors.size < app.executorLimit
} else {
true
}
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor &&
usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor &&
coresToAssign >= minCoresPerExecutor &&
underLimit
}
while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) {
freeWorkers = freeWorkers.filter(canLaunchExecutor)
// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) {
coresToAssign -= coresPerExecutor
assignedCores(pos) += coresPerExecutor
// If cores per executor is not set, we are assigning 1 core at a time
// without actually meaning to launch 1 executor for each core assigned
if (app.desc.coresPerExecutor.isDefined) {
assignedMemory(pos) += memoryPerExecutor
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor
// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
// Spreading out an application means spreading out its executors across as
@ -600,6 +624,7 @@ private[master] class Master(
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}
@ -785,9 +810,7 @@ private[master] class Master(
rebuildSparkUI(app)
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))
exec.state = ExecutorState.KILLED
killExecutor(exec)
}
app.markFinished(state)
if (state != ApplicationState.FINISHED) {
@ -803,6 +826,87 @@ private[master] class Master(
}
}
/**
* Handle a request to set the target number of executors for this application.
*
* If the executor limit is adjusted upwards, new executors will be launched provided
* that there are workers with sufficient resources. If it is adjusted downwards, however,
* we do not kill existing executors until we explicitly receive a kill request.
*
* @return whether the application has previously registered with this Master.
*/
private def handleRequestExecutors(appId: String, requestedTotal: Int): Boolean = {
idToApp.get(appId) match {
case Some(appInfo) =>
logInfo(s"Application $appId requested to set total executors to $requestedTotal.")
appInfo.executorLimit = requestedTotal
schedule()
true
case None =>
logWarning(s"Unknown application $appId requested $requestedTotal total executors.")
false
}
}
/**
* Handle a kill request from the given application.
*
* This method assumes the executor limit has already been adjusted downwards through
* a separate [[RequestExecutors]] message, such that we do not launch new executors
* immediately after the old ones are removed.
*
* @return whether the application has previously registered with this Master.
*/
private def handleKillExecutors(appId: String, executorIds: Seq[Int]): Boolean = {
idToApp.get(appId) match {
case Some(appInfo) =>
logInfo(s"Application $appId requests to kill executors: " + executorIds.mkString(", "))
val (known, unknown) = executorIds.partition(appInfo.executors.contains)
known.foreach { executorId =>
val desc = appInfo.executors(executorId)
appInfo.removeExecutor(desc)
killExecutor(desc)
}
if (unknown.nonEmpty) {
logWarning(s"Application $appId attempted to kill non-existent executors: "
+ unknown.mkString(", "))
}
schedule()
true
case None =>
logWarning(s"Unregistered application $appId requested us to kill executors!")
false
}
}
/**
* Cast the given executor IDs to integers and filter out the ones that fail.
*
* All executors IDs should be integers since we launched these executors. However,
* the kill interface on the driver side accepts arbitrary strings, so we need to
* handle non-integer executor IDs just to be safe.
*/
private def formatExecutorIds(executorIds: Seq[String]): Seq[Int] = {
executorIds.flatMap { executorId =>
try {
Some(executorId.toInt)
} catch {
case e: NumberFormatException =>
logError(s"Encountered executor with a non-integer ID: $executorId. Ignoring")
None
}
}
}
/**
* Ask the worker on which the specified executor is launched to kill the executor.
*/
private def killExecutor(exec: ExecutorDesc): Unit = {
exec.worker.removeExecutor(exec)
exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))
exec.state = ExecutorState.KILLED
}
/**
* Rebuild a new SparkUI from the given application's event logs.
* Return the UI if successful, else None

View file

@ -40,7 +40,7 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
private[worker] class Worker(
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
@ -553,6 +553,7 @@ private[worker] class Worker(
Utils.deleteRecursively(new File(dir))
}
}
shuffleService.applicationRemoved(id)
}
}
@ -660,6 +661,9 @@ private[worker] class Worker(
}
private[deploy] object Worker extends Logging {
val SYSTEM_NAME = "sparkWorker"
val ENDPOINT_NAME = "Worker"
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
@ -681,13 +685,12 @@ private[deploy] object Worker extends Logging {
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val actorName = "Worker"
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(actorName, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses,
systemName, actorName, workDir, conf, securityMgr))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}

View file

@ -134,7 +134,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
} else {
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
context.reply(RegisteredExecutor)
addressToExecutorId(executorRef.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
@ -149,6 +148,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(RegisteredExecutor)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
@ -435,7 +436,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Kill the given list of executors through the cluster manager.
* Return whether the kill request is acknowledged.
* @return whether the kill request is acknowledged.
*/
protected def doKillExecutors(executorIds: Seq[String]): Boolean = false

View file

@ -152,6 +152,34 @@ private[spark] class SparkDeploySchedulerBackend(
super.applicationId
}
/**
* Request executors from the Master by specifying the total number desired,
* including existing pending and running executors.
*
* @return whether the request is acknowledged.
*/
protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
Option(client) match {
case Some(c) => c.requestTotalExecutors(requestedTotal)
case None =>
logWarning("Attempted to request executors before driver fully initialized.")
false
}
}
/**
* Kill the given list of executors through the Master.
* @return whether the kill request is acknowledged.
*/
protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
Option(client) match {
case Some(c) => c.killExecutors(executorIds)
case None =>
logWarning("Attempted to kill executors before driver fully initialized.")
false
}
}
private def waitForRegistration() = {
registrationBarrier.acquire()
}

View file

@ -0,0 +1,363 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import org.mockito.Mockito.{mock, when}
import org.scalatest.BeforeAndAfterAll
import org.apache.spark._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.cluster._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
/**
* End-to-end tests for dynamic allocation in standalone mode.
*/
class StandaloneDynamicAllocationSuite
extends SparkFunSuite
with LocalSparkContext
with BeforeAndAfterAll {
private val numWorkers = 2
private val conf = new SparkConf()
private val securityManager = new SecurityManager(conf)
private var masterRpcEnv: RpcEnv = null
private var workerRpcEnvs: Seq[RpcEnv] = null
private var master: Master = null
private var workers: Seq[Worker] = null
/**
* Start the local cluster.
* Note: local-cluster mode is insufficient because we want a reference to the Master.
*/
override def beforeAll(): Unit = {
super.beforeAll()
masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager)
workerRpcEnvs = (0 until numWorkers).map { i =>
RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager)
}
master = makeMaster()
workers = makeWorkers(10, 2048)
}
override def afterAll(): Unit = {
masterRpcEnv.shutdown()
workerRpcEnvs.foreach(_.shutdown())
master.stop()
workers.foreach(_.stop())
masterRpcEnv = null
workerRpcEnvs = null
master = null
workers = null
super.afterAll()
}
test("dynamic allocation default behavior") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
// kill all executors
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 1)
// request 1 more
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === 2)
// request 1 more; this one won't go through
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === 3)
// kill all existing executors; we should end up with 3 - 2 = 1 executor
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 1)
// kill all executors again; this time we'll have 1 - 1 = 0 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with max cores <= cores per worker") {
sc = new SparkContext(appConf.set("spark.cores.max", "8"))
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
// kill all executors
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.executors.values.head.cores === 8)
assert(master.apps.head.getExecutorLimit === 1)
// request 1 more; this one won't go through because we're already at max cores.
// This highlights a limitation of using dynamic allocation with max cores WITHOUT
// setting cores per executor: once an application scales down and then scales back
// up, its executors may not be spread out anymore!
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 2)
// request 1 more; this one also won't go through for the same reason
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 3)
// kill all existing executors; we should end up with 3 - 1 = 2 executor
// Note: we scheduled these executors together, so their cores should be evenly distributed
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
assert(master.apps.head.getExecutorLimit === 2)
// kill all executors again; this time we'll have 1 - 1 = 0 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
assert(master.apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with max cores > cores per worker") {
sc = new SparkContext(appConf.set("spark.cores.max", "16"))
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
// kill all executors
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.executors.values.head.cores === 10)
assert(master.apps.head.getExecutorLimit === 1)
// request 1 more
// Note: the cores are not evenly distributed because we scheduled these executors 1 by 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toSet === Set(10, 6))
assert(master.apps.head.getExecutorLimit === 2)
// request 1 more; this one won't go through
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === 3)
// kill all existing executors; we should end up with 3 - 2 = 1 executor
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.executors.values.head.cores === 10)
assert(master.apps.head.getExecutorLimit === 1)
// kill all executors again; this time we'll have 1 - 1 = 0 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
assert(master.apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with cores per executor") {
sc = new SparkContext(appConf.set("spark.executor.cores", "2"))
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 10) // 20 cores total
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
// kill all executors
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 1)
// request 3 more
assert(sc.requestExecutors(3))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 4)
// request 10 more; only 6 will go through
assert(sc.requestExecutors(10))
assert(master.apps.head.executors.size === 10)
assert(master.apps.head.getExecutorLimit === 14)
// kill 2 executors; we should get 2 back immediately
assert(killNExecutors(sc, 2))
assert(master.apps.head.executors.size === 10)
assert(master.apps.head.getExecutorLimit === 12)
// kill 4 executors; we should end up with 12 - 4 = 8 executors
assert(killNExecutors(sc, 4))
assert(master.apps.head.executors.size === 8)
assert(master.apps.head.getExecutorLimit === 8)
// kill all executors; this time we'll have 8 - 8 = 0 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
assert(master.apps.head.executors.size === 10)
assert(master.apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with cores per executor AND max cores") {
sc = new SparkContext(appConf
.set("spark.executor.cores", "2")
.set("spark.cores.max", "8"))
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 4) // 8 cores total
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
// kill all executors
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 1)
// request 3 more
assert(sc.requestExecutors(3))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 4)
// request 10 more; none will go through
assert(sc.requestExecutors(10))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 14)
// kill all executors; 4 executors will be launched immediately
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 10)
// ... and again
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 6)
// ... and again; now we end up with 6 - 4 = 2 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === 2)
// ... and again; this time we have 2 - 2 = 0 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 1000)
}
// ===============================
// | Utility methods for testing |
// ===============================
/** Return a SparkConf for applications that want to talk to our Master. */
private def appConf: SparkConf = {
new SparkConf()
.setMaster(masterRpcEnv.address.toSparkURL)
.setAppName("test")
.set("spark.executor.memory", "256m")
}
/** Make a master to which our application will send executor requests. */
private def makeMaster(): Master = {
val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, securityManager, conf)
masterRpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
master
}
/** Make a few workers that talk to our master. */
private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
(0 until numWorkers).map { i =>
val rpcEnv = workerRpcEnvs(i)
val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager)
rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
worker
}
}
/** Kill all executors belonging to this application. */
private def killAllExecutors(sc: SparkContext): Boolean = {
killNExecutors(sc, Int.MaxValue)
}
/** Kill N executors belonging to this application. */
private def killNExecutors(sc: SparkContext, n: Int): Boolean = {
syncExecutors(sc)
sc.killExecutors(getExecutorIds(sc).take(n))
}
/**
* Return a list of executor IDs belonging to this application.
*
* Note that we must use the executor IDs according to the Master, which has the most
* updated view. We cannot rely on the executor IDs according to the driver because we
* don't wait for executors to register. Otherwise the tests will take much longer to run.
*/
private def getExecutorIds(sc: SparkContext): Seq[String] = {
assert(master.idToApp.contains(sc.applicationId))
master.idToApp(sc.applicationId).executors.keys.map(_.toString).toSeq
}
/**
* Sync executor IDs between the driver and the Master.
*
* This allows us to avoid waiting for new executors to register with the driver before
* we submit a request to kill them. This must be called before each kill request.
*/
private def syncExecutors(sc: SparkContext): Unit = {
val driverExecutors = sc.getExecutorStorageStatus
.map(_.blockManagerId.executorId)
.filter { _ != SparkContext.DRIVER_IDENTIFIER}
val masterExecutors = getExecutorIds(sc)
val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted
missingExecutors.foreach { id =>
// Fake an executor registration so the driver knows about us
val port = System.currentTimeMillis % 65536
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty)
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message)
}
}
}

View file

@ -120,7 +120,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
}
test("Master & worker web ui available") {
test("master/worker web ui available") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
@ -144,174 +144,202 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
}
test("basic scheduling - spread out") {
testBasicScheduling(spreadOut = true)
basicScheduling(spreadOut = true)
}
test("basic scheduling - no spread out") {
testBasicScheduling(spreadOut = false)
basicScheduling(spreadOut = false)
}
test("scheduling with max cores - spread out") {
testSchedulingWithMaxCores(spreadOut = true)
schedulingWithMaxCores(spreadOut = true)
}
test("scheduling with max cores - no spread out") {
testSchedulingWithMaxCores(spreadOut = false)
schedulingWithMaxCores(spreadOut = false)
}
test("scheduling with cores per executor - spread out") {
testSchedulingWithCoresPerExecutor(spreadOut = true)
schedulingWithCoresPerExecutor(spreadOut = true)
}
test("scheduling with cores per executor - no spread out") {
testSchedulingWithCoresPerExecutor(spreadOut = false)
schedulingWithCoresPerExecutor(spreadOut = false)
}
test("scheduling with cores per executor AND max cores - spread out") {
testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
schedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
}
test("scheduling with cores per executor AND max cores - no spread out") {
testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
schedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
}
private def testBasicScheduling(spreadOut: Boolean): Unit = {
test("scheduling with executor limit - spread out") {
schedulingWithExecutorLimit(spreadOut = true)
}
test("scheduling with executor limit - no spread out") {
schedulingWithExecutorLimit(spreadOut = false)
}
test("scheduling with executor limit AND max cores - spread out") {
schedulingWithExecutorLimitAndMaxCores(spreadOut = true)
}
test("scheduling with executor limit AND max cores - no spread out") {
schedulingWithExecutorLimitAndMaxCores(spreadOut = false)
}
test("scheduling with executor limit AND cores per executor - spread out") {
schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut = true)
}
test("scheduling with executor limit AND cores per executor - no spread out") {
schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut = false)
}
test("scheduling with executor limit AND cores per executor AND max cores - spread out") {
schedulingWithEverything(spreadOut = true)
}
test("scheduling with executor limit AND cores per executor AND max cores - no spread out") {
schedulingWithEverything(spreadOut = false)
}
private def basicScheduling(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo = makeAppInfo(1024)
val workerInfo = makeWorkerInfo(4096, 10)
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
val scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
assert(scheduledCores(0) === 10)
assert(scheduledCores(1) === 10)
assert(scheduledCores(2) === 10)
val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
assert(scheduledCores === Array(10, 10, 10))
}
private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = {
private def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
val workerInfo = makeWorkerInfo(4096, 10)
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
var scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
// With spreading out, each worker should be assigned a few cores
val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
if (spreadOut) {
assert(scheduledCores(0) === 3)
assert(scheduledCores(1) === 3)
assert(scheduledCores(2) === 2)
assert(scheduledCores1 === Array(3, 3, 2))
assert(scheduledCores2 === Array(6, 5, 5))
} else {
// Without spreading out, the cores should be concentrated on the first worker
assert(scheduledCores(0) === 8)
assert(scheduledCores(1) === 0)
assert(scheduledCores(2) === 0)
}
// Now test the same thing with max cores > cores per worker
scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
if (spreadOut) {
assert(scheduledCores(0) === 6)
assert(scheduledCores(1) === 5)
assert(scheduledCores(2) === 5)
} else {
// Without spreading out, the first worker should be fully booked,
// and the leftover cores should spill over to the second worker only.
assert(scheduledCores(0) === 10)
assert(scheduledCores(1) === 6)
assert(scheduledCores(2) === 0)
assert(scheduledCores1 === Array(8, 0, 0))
assert(scheduledCores2 === Array(10, 6, 0))
}
}
private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
private def schedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
val workerInfo = makeWorkerInfo(4096, 10)
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
// Each worker should end up with 4 executors with 2 cores each
// This should be 4 because of the memory restriction on each worker
var scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
assert(scheduledCores(0) === 8)
assert(scheduledCores(1) === 8)
assert(scheduledCores(2) === 8)
// Now test the same thing without running into the worker memory limit
// Each worker should now end up with 5 executors with 2 cores each
scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
assert(scheduledCores(0) === 10)
assert(scheduledCores(1) === 10)
assert(scheduledCores(2) === 10)
// Now test the same thing with a cores per executor that 10 is not divisible by
scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
assert(scheduledCores(0) === 9)
assert(scheduledCores(1) === 9)
assert(scheduledCores(2) === 9)
val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
assert(scheduledCores1 === Array(8, 8, 8)) // 4 * 2 because of memory limits
assert(scheduledCores2 === Array(10, 10, 10)) // 5 * 2
assert(scheduledCores3 === Array(9, 9, 9)) // 3 * 3
}
// Sorry for the long method name!
private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
private def schedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4))
val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20))
val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20))
val workerInfo = makeWorkerInfo(4096, 10)
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
// We should only launch two executors, each with exactly 2 cores
var scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
if (spreadOut) {
assert(scheduledCores(0) === 2)
assert(scheduledCores(1) === 2)
assert(scheduledCores(2) === 0)
assert(scheduledCores1 === Array(2, 2, 0))
assert(scheduledCores2 === Array(8, 6, 6))
assert(scheduledCores3 === Array(6, 6, 6))
} else {
assert(scheduledCores(0) === 4)
assert(scheduledCores(1) === 0)
assert(scheduledCores(2) === 0)
}
// Test max cores > number of cores per worker
scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
if (spreadOut) {
assert(scheduledCores(0) === 8)
assert(scheduledCores(1) === 6)
assert(scheduledCores(2) === 6)
} else {
assert(scheduledCores(0) === 10)
assert(scheduledCores(1) === 10)
assert(scheduledCores(2) === 0)
}
// Test max cores > number of cores per worker AND
// a cores per executor that is 10 is not divisible by
scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
if (spreadOut) {
assert(scheduledCores(0) === 6)
assert(scheduledCores(1) === 6)
assert(scheduledCores(2) === 6)
} else {
assert(scheduledCores(0) === 9)
assert(scheduledCores(1) === 9)
assert(scheduledCores(2) === 0)
assert(scheduledCores1 === Array(4, 0, 0))
assert(scheduledCores2 === Array(10, 10, 0))
assert(scheduledCores3 === Array(9, 9, 0))
}
}
// ===============================
// | Utility methods for testing |
// ===============================
private def schedulingWithExecutorLimit(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo = makeAppInfo(256)
appInfo.executorLimit = 0
val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
appInfo.executorLimit = 2
val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
appInfo.executorLimit = 5
val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
assert(scheduledCores1 === Array(0, 0, 0))
assert(scheduledCores2 === Array(10, 10, 0))
assert(scheduledCores3 === Array(10, 10, 10))
}
private def schedulingWithExecutorLimitAndMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo = makeAppInfo(256, maxCores = Some(16))
appInfo.executorLimit = 0
val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
appInfo.executorLimit = 2
val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
appInfo.executorLimit = 5
val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
assert(scheduledCores1 === Array(0, 0, 0))
if (spreadOut) {
assert(scheduledCores2 === Array(8, 8, 0))
assert(scheduledCores3 === Array(6, 5, 5))
} else {
assert(scheduledCores2 === Array(10, 6, 0))
assert(scheduledCores3 === Array(10, 6, 0))
}
}
private def schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo = makeAppInfo(256, coresPerExecutor = Some(4))
appInfo.executorLimit = 0
val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
appInfo.executorLimit = 2
val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
appInfo.executorLimit = 5
val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
assert(scheduledCores1 === Array(0, 0, 0))
if (spreadOut) {
assert(scheduledCores2 === Array(4, 4, 0))
} else {
assert(scheduledCores2 === Array(8, 0, 0))
}
assert(scheduledCores3 === Array(8, 8, 4))
}
// Everything being: executor limit + cores per executor + max cores
private def schedulingWithEverything(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo = makeAppInfo(256, coresPerExecutor = Some(4), maxCores = Some(18))
appInfo.executorLimit = 0
val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
appInfo.executorLimit = 2
val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
appInfo.executorLimit = 5
val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
assert(scheduledCores1 === Array(0, 0, 0))
if (spreadOut) {
assert(scheduledCores2 === Array(4, 4, 0))
assert(scheduledCores3 === Array(8, 4, 4))
} else {
assert(scheduledCores2 === Array(8, 0, 0))
assert(scheduledCores3 === Array(8, 8, 0))
}
}
// ==========================================
// | Utility methods and fields for testing |
// ==========================================
private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)
private val workerInfo = makeWorkerInfo(4096, 10)
private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
private def makeMaster(conf: SparkConf = new SparkConf): Master = {
val securityMgr = new SecurityManager(conf)
@ -335,4 +363,12 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address")
}
private def scheduleExecutorsOnWorkers(
master: Master,
appInfo: ApplicationInfo,
workerInfos: Array[WorkerInfo],
spreadOut: Boolean): Array[Int] = {
master.invokePrivate(_scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
}
}

View file

@ -151,6 +151,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException")
) ++ Seq(
// SPARK-4751 Dynamic allocation for standalone mode
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.SparkContext.supportDynamicAllocation")
)
case v if v.startsWith("1.4") =>