[SPARK-2635] Fix race condition at SchedulerBackend.isReady in standalone mode
In SPARK-1946(PR #900), configuration <code>spark.scheduler.minRegisteredExecutorsRatio</code> was introduced. However, in standalone mode, there is a race condition where isReady() can return true because totalExpectedExecutors has not been correctly set. Because expected executors is uncertain in standalone mode, the PR try to use CPU cores(<code>--total-executor-cores</code>) as expected resources to judge whether SchedulerBackend is ready. Author: li-zhihui <zhihui.li@intel.com> Author: Li Zhihui <zhihui.li@intel.com> Closes #1525 from li-zhihui/fixre4s and squashes the following commits: e9a630b [Li Zhihui] Rename variable totalExecutors and clean codes abf4860 [Li Zhihui] Push down variable totalExpectedResources to children classes ca54bd9 [li-zhihui] Format log with String interpolation 88c7dc6 [li-zhihui] Few codes and docs refactor 41cf47e [li-zhihui] Fix race condition at SchedulerBackend.isReady in standalone mode
This commit is contained in:
parent
43af281700
commit
28dbae85aa
|
@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
|
|||
{
|
||||
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
|
||||
var totalCoreCount = new AtomicInteger(0)
|
||||
var totalExpectedExecutors = new AtomicInteger(0)
|
||||
var totalRegisteredExecutors = new AtomicInteger(0)
|
||||
val conf = scheduler.sc.conf
|
||||
private val timeout = AkkaUtils.askTimeout(conf)
|
||||
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
|
||||
// Submit tasks only after (registered executors / total expected executors)
|
||||
// Submit tasks only after (registered resources / total expected resources)
|
||||
// is equal to at least this value, that is double between 0 and 1.
|
||||
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
|
||||
if (minRegisteredRatio > 1) minRegisteredRatio = 1
|
||||
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
|
||||
var minRegisteredRatio =
|
||||
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
|
||||
// Submit tasks after maxRegisteredWaitingTime milliseconds
|
||||
// if minRegisteredRatio has not yet been reached
|
||||
val maxRegisteredWaitingTime =
|
||||
conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
|
||||
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
|
||||
val createTime = System.currentTimeMillis()
|
||||
var ready = if (minRegisteredRatio <= 0) true else false
|
||||
|
||||
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
|
||||
private val executorActor = new HashMap[String, ActorRef]
|
||||
|
@ -94,12 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
|
|||
executorAddress(executorId) = sender.path.address
|
||||
addressToExecutorId(sender.path.address) = executorId
|
||||
totalCoreCount.addAndGet(cores)
|
||||
if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
|
||||
ready = true
|
||||
logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
|
||||
executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
|
||||
", minRegisteredExecutorsRatio: " + minRegisteredRatio)
|
||||
}
|
||||
totalRegisteredExecutors.addAndGet(1)
|
||||
makeOffers()
|
||||
}
|
||||
|
||||
|
@ -268,14 +263,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
|
|||
}
|
||||
}
|
||||
|
||||
def sufficientResourcesRegistered(): Boolean = true
|
||||
|
||||
override def isReady(): Boolean = {
|
||||
if (ready) {
|
||||
if (sufficientResourcesRegistered) {
|
||||
logInfo("SchedulerBackend is ready for scheduling beginning after " +
|
||||
s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
|
||||
return true
|
||||
}
|
||||
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
|
||||
ready = true
|
||||
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
|
||||
"maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
|
||||
s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
|
||||
return true
|
||||
}
|
||||
false
|
||||
|
|
|
@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
|
|||
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
|
||||
|
||||
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
|
||||
val totalExpectedCores = maxCores.getOrElse(0)
|
||||
|
||||
override def start() {
|
||||
super.start()
|
||||
|
@ -97,7 +98,6 @@ private[spark] class SparkDeploySchedulerBackend(
|
|||
|
||||
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
|
||||
memory: Int) {
|
||||
totalExpectedExecutors.addAndGet(1)
|
||||
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
|
||||
fullId, hostPort, cores, Utils.megabytesToString(memory)))
|
||||
}
|
||||
|
@ -110,4 +110,8 @@ private[spark] class SparkDeploySchedulerBackend(
|
|||
logInfo("Executor %s removed: %s".format(fullId, message))
|
||||
removeExecutor(fullId.split("/")(1), reason.toString)
|
||||
}
|
||||
|
||||
override def sufficientResourcesRegistered(): Boolean = {
|
||||
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
|
||||
}
|
||||
}
|
||||
|
|
|
@ -825,21 +825,22 @@ Apart from these, the following properties are also available, and may be useful
|
|||
</td>
|
||||
</tr>
|
||||
</tr>
|
||||
<td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
|
||||
<td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
|
||||
<td>0</td>
|
||||
<td>
|
||||
The minimum ratio of registered executors (registered executors / total expected executors)
|
||||
The minimum ratio of registered resources (registered resources / total expected resources)
|
||||
(resources are executors in yarn mode, CPU cores in standalone mode)
|
||||
to wait for before scheduling begins. Specified as a double between 0 and 1.
|
||||
Regardless of whether the minimum ratio of executors has been reached,
|
||||
Regardless of whether the minimum ratio of resources has been reached,
|
||||
the maximum amount of time it will wait before scheduling begins is controlled by config
|
||||
<code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code>
|
||||
<code>spark.scheduler.maxRegisteredResourcesWaitingTime</code>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
|
||||
<td><code>spark.scheduler.maxRegisteredResourcesWaitingTime</code></td>
|
||||
<td>30000</td>
|
||||
<td>
|
||||
Maximum amount of time to wait for executors to register before scheduling begins
|
||||
Maximum amount of time to wait for resources to register before scheduling begins
|
||||
(in milliseconds).
|
||||
</td>
|
||||
</tr>
|
||||
|
|
|
@ -30,15 +30,15 @@ private[spark] class YarnClientSchedulerBackend(
|
|||
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
|
||||
with Logging {
|
||||
|
||||
if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
|
||||
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
|
||||
minRegisteredRatio = 0.8
|
||||
ready = false
|
||||
}
|
||||
|
||||
var client: Client = null
|
||||
var appId: ApplicationId = null
|
||||
var checkerThread: Thread = null
|
||||
var stopping: Boolean = false
|
||||
var totalExpectedExecutors = 0
|
||||
|
||||
private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
|
||||
arrayBuf: ArrayBuffer[String]) {
|
||||
|
@ -84,7 +84,7 @@ private[spark] class YarnClientSchedulerBackend(
|
|||
|
||||
logDebug("ClientArguments called with: " + argsArrayBuf)
|
||||
val args = new ClientArguments(argsArrayBuf.toArray, conf)
|
||||
totalExpectedExecutors.set(args.numExecutors)
|
||||
totalExpectedExecutors = args.numExecutors
|
||||
client = new Client(args, conf)
|
||||
appId = client.runApp()
|
||||
waitForApp()
|
||||
|
@ -150,4 +150,7 @@ private[spark] class YarnClientSchedulerBackend(
|
|||
logInfo("Stopped")
|
||||
}
|
||||
|
||||
override def sufficientResourcesRegistered(): Boolean = {
|
||||
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,19 +27,24 @@ private[spark] class YarnClusterSchedulerBackend(
|
|||
sc: SparkContext)
|
||||
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
|
||||
|
||||
if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
|
||||
var totalExpectedExecutors = 0
|
||||
|
||||
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
|
||||
minRegisteredRatio = 0.8
|
||||
ready = false
|
||||
}
|
||||
|
||||
override def start() {
|
||||
super.start()
|
||||
var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
|
||||
totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
|
||||
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
|
||||
numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
|
||||
totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
|
||||
.getOrElse(totalExpectedExecutors)
|
||||
}
|
||||
// System property can override environment variable.
|
||||
numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
|
||||
totalExpectedExecutors.set(numExecutors)
|
||||
totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
|
||||
}
|
||||
|
||||
override def sufficientResourcesRegistered(): Boolean = {
|
||||
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue