[SPARK-20286][CORE] Improve logic for timing out executors in dynamic allocation.

This change refactors the portions of the ExecutorAllocationManager class that
track executor state into a new class, to achieve a few goals:

- make the code easier to understand
- better separate concerns (task backlog vs. executor state)
- less synchronization between event and allocation threads
- less coupling between the allocation code and executor state tracking

The executor tracking code was moved to a new class (ExecutorMonitor) that
encapsulates all the logic of tracking what happens to executors and when
they can be timed out. The logic to actually remove the executors remains
in the EAM, since it still requires information that is not tracked by the
new executor monitor code.

In the executor monitor itself, of interest, specifically, is a change in
how cached blocks are tracked; instead of polling the block manager, the
monitor now uses events to track which executors have cached blocks, and
is able to detect also unpersist events and adjust the time when the executor
should be removed accordingly. (That's the bug mentioned in the PR title.)

Because of the refactoring, a few tests in the old EAM test suite were removed,
since they're now covered by the newly added test suite. The EAM suite was
also changed a little bit to not instantiate a SparkContext every time. This
allowed some cleanup, and the tests also run faster.

Tested with new and updated unit tests, and with multiple TPC-DS workloads
running with dynamic allocation on; also some manual tests for the caching
behavior.

Closes #24704 from vanzin/SPARK-20286.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
This commit is contained in:
Marcelo Vanzin 2019-06-05 08:09:44 -05:00 committed by Imran Rashid
parent fcb3fb04c5
commit b312033bd3
15 changed files with 823 additions and 791 deletions

View file

@ -23,10 +23,17 @@ package org.apache.spark
*/
private[spark] trait ExecutorAllocationClient {
/** Get the list of currently active executors */
private[spark] def getExecutorIds(): Seq[String]
/**
* Whether an executor is active. An executor is active when it can be used to execute tasks
* for jobs submitted by the application.
*
* @return whether the executor with the given ID is currently active.
*/
def isExecutorActive(id: String): Boolean
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.

View file

@ -30,7 +30,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMaster
import org.apache.spark.scheduler.dynalloc.ExecutorMonitor
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
/**
@ -94,7 +94,7 @@ private[spark] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
listenerBus: LiveListenerBus,
conf: SparkConf,
blockManagerMaster: BlockManagerMaster)
clock: Clock = new SystemClock())
extends Logging {
allocationManager =>
@ -113,11 +113,6 @@ private[spark] class ExecutorAllocationManager(
private val sustainedSchedulerBacklogTimeoutS =
conf.get(DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT)
// How long an executor must be idle for before it is removed (seconds)
private val executorIdleTimeoutS = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)
private val cachedExecutorIdleTimeoutS = conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)
// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.get(DYN_ALLOCATION_TESTING)
@ -139,20 +134,10 @@ private[spark] class ExecutorAllocationManager(
// is the number of executors we would immediately want from the cluster manager.
private var numExecutorsTarget = initialNumExecutors
// Executors that have been requested to be removed but have not been killed yet
private val executorsPendingToRemove = new mutable.HashSet[String]
// All known executors
private val executorIds = new mutable.HashSet[String]
// A timestamp of when an addition should be triggered, or NOT_SET if it is not set
// This is set when pending tasks are added but not scheduled yet
private var addTime: Long = NOT_SET
// A timestamp for each executor of when the executor should be removed, indexed by the ID
// This is set when an executor is no longer running a task, or when it first registers
private val removeTimes = new mutable.HashMap[String, Long]
// Polling loop interval (ms)
private val intervalMillis: Long = if (Utils.isTesting) {
conf.get(TEST_SCHEDULE_INTERVAL)
@ -160,12 +145,11 @@ private[spark] class ExecutorAllocationManager(
100
}
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new SystemClock()
// Listener for Spark events that impact the allocation policy
val listener = new ExecutorAllocationListener
val executorMonitor = new ExecutorMonitor(conf, client, clock)
// Executor that handles the scheduling task.
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
@ -210,12 +194,6 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
if (executorIdleTimeoutS < 0) {
throw new SparkException(s"${DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!")
}
if (cachedExecutorIdleTimeoutS < 0) {
throw new SparkException(s"${DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!")
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
@ -229,19 +207,13 @@ private[spark] class ExecutorAllocationManager(
}
}
/**
* Use a different clock for this allocation manager. This is mainly used for testing.
*/
def setClock(newClock: Clock): Unit = {
clock = newClock
}
/**
* Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addToManagementQueue(listener)
listenerBus.addToManagementQueue(executorMonitor)
val scheduleTask = new Runnable() {
override def run(): Unit = {
@ -278,8 +250,7 @@ private[spark] class ExecutorAllocationManager(
def reset(): Unit = synchronized {
addTime = 0L
numExecutorsTarget = initialNumExecutors
executorsPendingToRemove.clear()
removeTimes.clear()
executorMonitor.reset()
}
/**
@ -307,19 +278,13 @@ private[spark] class ExecutorAllocationManager(
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
executorIdsToBeRemoved += executorId
}
!expired
val executorIdsToBeRemoved = executorMonitor.timedOutExecutors()
if (executorIdsToBeRemoved.nonEmpty) {
initializing = false
}
// Update executor target number only after initializing flag is unset
updateAndSyncNumExecutorsTarget(now)
updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
@ -395,7 +360,7 @@ private[spark] class ExecutorAllocationManager(
val oldNumExecutorsTarget = numExecutorsTarget
// There's no point in wasting time ramping up to the number of executors we already have, so
// make sure our target is at least as much as our current allocation:
numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
numExecutorsTarget = math.max(numExecutorsTarget, executorMonitor.executorCount)
// Boost our target with the number to add for this round:
numExecutorsTarget += numExecutorsToAdd
// Ensure that our target doesn't exceed what we need at the present moment:
@ -455,7 +420,7 @@ private[spark] class ExecutorAllocationManager(
val executorIdsToBeRemoved = new ArrayBuffer[String]
logInfo("Request to remove executorIds: " + executors.mkString(", "))
val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size
val numExistingExecutors = executorMonitor.executorCount - executorMonitor.pendingRemovalCount
var newExecutorTotal = numExistingExecutors
executors.foreach { executorIdToBeRemoved =>
@ -465,7 +430,7 @@ private[spark] class ExecutorAllocationManager(
} else if (newExecutorTotal - 1 < numExecutorsTarget) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)")
} else if (canBeKilled(executorIdToBeRemoved)) {
} else {
executorIdsToBeRemoved += executorIdToBeRemoved
newExecutorTotal -= 1
}
@ -484,24 +449,17 @@ private[spark] class ExecutorAllocationManager(
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
countFailures = false, force = false)
}
// [SPARK-21834] killExecutors api reduces the target number of executors.
// So we need to update the target with desired value.
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
// reset the newExecutorTotal to the existing number of executors
newExecutorTotal = numExistingExecutors
if (testing || executorsRemoved.nonEmpty) {
executorsRemoved.foreach { removedExecutorId =>
// If it has an exclusive cached block then cachedExecutorIdleTimeoutS is used for timeout
val idleTimeout = if (blockManagerMaster.hasExclusiveCachedBlocks(removedExecutorId)) {
cachedExecutorIdleTimeoutS
} else {
executorIdleTimeoutS
}
newExecutorTotal -= 1
logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
s"$idleTimeout seconds (new desired total will be $newExecutorTotal)")
executorsPendingToRemove.add(removedExecutorId)
}
newExecutorTotal -= executorsRemoved.size
executorMonitor.executorsKilled(executorsRemoved)
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout." +
s"(new desired total will be $newExecutorTotal)")
executorsRemoved
} else {
logWarning(s"Unable to reach the cluster manager to kill executor/s " +
@ -510,70 +468,6 @@ private[spark] class ExecutorAllocationManager(
}
}
/**
* Request the cluster manager to remove the given executor.
* Return whether the request is acknowledged.
*/
private def removeExecutor(executorId: String): Boolean = synchronized {
val executorsRemoved = removeExecutors(Seq(executorId))
executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
}
/**
* Determine if the given executor can be killed.
*/
private def canBeKilled(executorId: String): Boolean = synchronized {
// Do not kill the executor if we are not aware of it (should never happen)
if (!executorIds.contains(executorId)) {
logWarning(s"Attempted to remove unknown executor $executorId!")
return false
}
// Do not kill the executor again if it is already pending to be killed (should never happen)
if (executorsPendingToRemove.contains(executorId)) {
logWarning(s"Attempted to remove executor $executorId " +
s"when it is already pending to be removed!")
return false
}
true
}
/**
* Callback invoked when the specified executor has been added.
*/
private def onExecutorAdded(executorId: String): Unit = synchronized {
if (!executorIds.contains(executorId)) {
executorIds.add(executorId)
// If an executor (call this executor X) is not removed because the lower bound
// has been reached, it will no longer be marked as idle. When new executors join,
// however, we are no longer at the lower bound, and so we must mark executor X
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
} else {
logWarning(s"Duplicate executor $executorId has registered")
}
}
/**
* Callback invoked when the specified executor has been removed.
*/
private def onExecutorRemoved(executorId: String): Unit = synchronized {
if (executorIds.contains(executorId)) {
executorIds.remove(executorId)
removeTimes.remove(executorId)
logInfo(s"Existing executor $executorId has been removed (new total is ${executorIds.size})")
if (executorsPendingToRemove.contains(executorId)) {
executorsPendingToRemove.remove(executorId)
logDebug(s"Executor $executorId is no longer pending to " +
s"be removed (${executorsPendingToRemove.size} left)")
}
} else {
logWarning(s"Unknown executor $executorId has been removed!")
}
}
/**
* Callback invoked when the scheduler receives new pending tasks.
* This sets a time in the future that decides when executors should be added
@ -597,46 +491,6 @@ private[spark] class ExecutorAllocationManager(
numExecutorsToAdd = 1
}
/**
* Callback invoked when the specified executor is no longer running any tasks.
* This sets a time in the future that decides when this executor should be removed if
* the executor is not already marked as idle.
*/
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
// Note that it is not necessary to query the executors since all the cached blocks we are
// concerned with are reported to the driver. This does not include broadcast blocks and
// non-exclusive blocks which are also available via the external shuffle service.
val hasCachedBlocks = blockManagerMaster.hasExclusiveCachedBlocks(executorId)
val now = clock.getTimeMillis()
val timeout = {
if (hasCachedBlocks) {
// Use a different timeout if the executor has cached blocks.
now + cachedExecutorIdleTimeoutS * 1000
} else {
now + executorIdleTimeoutS * 1000
}
}
val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow
removeTimes(executorId) = realTimeout
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)")
}
} else {
logWarning(s"Attempted to mark unknown executor $executorId idle")
}
}
/**
* Callback invoked when the specified executor is now running a task.
* This resets all variables used for removing this executor.
*/
private def onExecutorBusy(executorId: String): Unit = synchronized {
logDebug(s"Clearing idle timer for $executorId because it is now running a task")
removeTimes.remove(executorId)
}
/**
* A listener that notifies the given allocation manager of when to add and remove executors.
*
@ -650,7 +504,6 @@ private[spark] class ExecutorAllocationManager(
// Should be 0 when no stages are active.
private val stageIdToNumRunningTask = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
// Number of speculative tasks to be scheduled in each stage
private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
// The speculative tasks started in each stage
@ -716,23 +569,11 @@ private[spark] class ExecutorAllocationManager(
val stageId = taskStart.stageId
val taskId = taskStart.taskInfo.taskId
val taskIndex = taskStart.taskInfo.index
val executorId = taskStart.taskInfo.executorId
allocationManager.synchronized {
if (stageIdToNumRunningTask.contains(stageId)) {
stageIdToNumRunningTask(stageId) += 1
}
// This guards against the following race condition:
// 1. The `SparkListenerTaskStart` event is posted before the
// `SparkListenerExecutorAdded` event
// 2. The `SparkListenerExecutorRemoved` event is posted before the
// `SparkListenerTaskStart` event
// Above cases are possible because these events are posted in different threads.
// (see SPARK-4951 SPARK-26927)
if (!allocationManager.executorIds.contains(executorId) &&
client.getExecutorIds().contains(executorId)) {
allocationManager.onExecutorAdded(executorId)
}
// If this is the last pending task, mark the scheduler queue as empty
if (taskStart.taskInfo.speculative) {
@ -744,15 +585,10 @@ private[spark] class ExecutorAllocationManager(
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerQueueEmpty()
}
// Mark the executor on which this task is scheduled as busy
executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
allocationManager.onExecutorBusy(executorId)
}
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId
val taskIndex = taskEnd.taskInfo.index
val stageId = taskEnd.stageId
@ -760,14 +596,6 @@ private[spark] class ExecutorAllocationManager(
if (stageIdToNumRunningTask.contains(stageId)) {
stageIdToNumRunningTask(stageId) -= 1
}
// If the executor is no longer running any scheduled tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
if (executorIdToTaskIds(executorId).isEmpty) {
executorIdToTaskIds -= executorId
allocationManager.onExecutorIdle(executorId)
}
}
// If the task failed, we expect it to be resubmitted later. To ensure we have
// enough resources to run the resubmitted task, we need to mark the scheduler
@ -785,22 +613,6 @@ private[spark] class ExecutorAllocationManager(
}
}
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
val executorId = executorAdded.executorId
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
if (!allocationManager.executorIds.contains(executorId)) {
allocationManager.onExecutorAdded(executorId)
}
}
}
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
allocationManager.onExecutorRemoved(executorRemoved.executorId)
}
override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted)
: Unit = {
val stageId = speculativeTask.stageId
@ -841,15 +653,6 @@ private[spark] class ExecutorAllocationManager(
stageIdToNumRunningTask.values.sum
}
/**
* Return true if an executor is not currently running a task, and false otherwise.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
*/
def isExecutorIdle(executorId: String): Boolean = {
!executorIdToTaskIds.contains(executorId)
}
/**
* Update the Executor placement hints (the number of tasks with locality preferences,
* a map where each pair is a node and the number of tasks that would like to be scheduled
@ -892,8 +695,8 @@ private[spark] class ExecutorAllocationManager(
}
registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0)
registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0)
registerGauge("numberAllExecutors", executorIds.size, 0)
registerGauge("numberExecutorsPendingToRemove", executorMonitor.pendingRemovalCount, 0)
registerGauge("numberAllExecutors", executorMonitor.executorCount, 0)
registerGauge("numberTargetExecutors", numExecutorsTarget, 0)
registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0)
}

View file

@ -585,8 +585,7 @@ class SparkContext(config: SparkConf) extends Logging {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.blockManager.master))
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
case _ =>
None
}

View file

@ -357,11 +357,15 @@ package object config {
private[spark] val DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.cachedExecutorIdleTimeout")
.timeConf(TimeUnit.SECONDS).createWithDefault(Integer.MAX_VALUE)
.timeConf(TimeUnit.SECONDS)
.checkValue(_ >= 0L, "Timeout must be >= 0.")
.createWithDefault(Integer.MAX_VALUE)
private[spark] val DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.executorIdleTimeout")
.timeConf(TimeUnit.SECONDS).createWithDefault(60)
.timeConf(TimeUnit.SECONDS)
.checkValue(_ >= 0L, "Timeout must be >= 0.")
.createWithDefault(60)
private[spark] val DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.schedulerBacklogTimeout")

View file

@ -540,6 +540,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.keySet.toSeq
}
override def isExecutorActive(id: String): Boolean = synchronized {
executorDataMap.contains(id) && !executorsPendingToRemove.contains(id)
}
override def maxNumConcurrentTasks(): Int = {
executorDataMap.values.map { executor =>
executor.totalCores / scheduler.CPUS_PER_TASK

View file

@ -0,0 +1,283 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.dynalloc
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.scheduler._
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Clock
/**
* A monitor for executor activity, used by ExecutorAllocationManager to detect idle executors.
*/
private[spark] class ExecutorMonitor(
conf: SparkConf,
client: ExecutorAllocationClient,
clock: Clock) extends SparkListener with Logging {
private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(
conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(
conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT))
private val fetchFromShuffleSvcEnabled = conf.get(SHUFFLE_SERVICE_ENABLED) &&
conf.get(SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
private val executors = new ConcurrentHashMap[String, Tracker]()
// The following fields are an optimization to avoid having to scan all executors on every EAM
// schedule interval to find out which ones are timed out. They keep track of when the next
// executor timeout is expected to happen, and the current list of timed out executors. There's
// also a flag that forces the EAM task to recompute the timed out executors, in case some event
// arrives on the listener bus that may cause the current list of timed out executors to change.
//
// There's also per-executor state used for this purpose, so that recomputations can be triggered
// only when really necessary.
//
// Note that this isn't meant to, and cannot, always make the right decision about which executors
// are indeed timed out. For example, the EAM thread may detect a timed out executor while a new
// "task start" event has just been posted to the listener bus and hasn't yet been delivered to
// this listener. There are safeguards in other parts of the code that would prevent that executor
// from being removed.
private val nextTimeout = new AtomicLong(Long.MaxValue)
private var timedOutExecs = Seq.empty[String]
def reset(): Unit = {
executors.clear()
nextTimeout.set(Long.MaxValue)
timedOutExecs = Nil
}
/**
* Returns the list of executors that are currently considered to be timed out.
* Should only be called from the EAM thread.
*/
def timedOutExecutors(): Seq[String] = {
val now = clock.getTimeMillis()
if (now >= nextTimeout.get()) {
// Temporarily set the next timeout at Long.MaxValue. This ensures that after
// scanning all executors below, we know when the next timeout for non-timed out
// executors is (whether that update came from the scan, or from a new event
// arriving in a different thread).
nextTimeout.set(Long.MaxValue)
var newNextTimeout = Long.MaxValue
timedOutExecs = executors.asScala
.filter { case (_, exec) => !exec.pendingRemoval }
.filter { case (_, exec) =>
val deadline = exec.timeoutAt
if (deadline > now) {
newNextTimeout = math.min(newNextTimeout, deadline)
exec.timedOut = false
false
} else {
exec.timedOut = true
true
}
}
.keys
.toSeq
updateNextTimeout(newNextTimeout)
}
timedOutExecs
}
/**
* Mark the given executors as pending to be removed. Should only be called in the EAM thread.
*/
def executorsKilled(ids: Seq[String]): Unit = {
ids.foreach { id =>
val tracker = executors.get(id)
if (tracker != null) {
tracker.pendingRemoval = true
}
}
// Recompute timed out executors in the next EAM callback, since this call invalidates
// the current list.
nextTimeout.set(Long.MinValue)
}
def executorCount: Int = executors.size()
def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => exec.pendingRemoval }
override def onTaskStart(event: SparkListenerTaskStart): Unit = {
val executorId = event.taskInfo.executorId
// Guard against a late arriving task start event (SPARK-26927).
if (client.isExecutorActive(executorId)) {
val exec = ensureExecutorIsTracked(executorId)
exec.updateRunningTasks(1)
}
}
override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
val executorId = event.taskInfo.executorId
val exec = executors.get(executorId)
if (exec != null) {
exec.updateRunningTasks(-1)
}
}
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
val exec = ensureExecutorIsTracked(event.executorId)
exec.updateRunningTasks(0)
logInfo(s"New executor ${event.executorId} has registered (new total is ${executors.size()})")
}
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
val removed = executors.remove(event.executorId)
if (removed != null) {
logInfo(s"Executor ${event.executorId} removed (new total is ${executors.size()})")
if (!removed.pendingRemoval) {
nextTimeout.set(Long.MinValue)
}
}
}
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
return
}
val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId)
val storageLevel = event.blockUpdatedInfo.storageLevel
val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId]
// SPARK-27677. When a block can be fetched from the external shuffle service, the executor can
// be removed without hurting the application too much, since the cached data is still
// available. So don't count blocks that can be served by the external service.
if (storageLevel.isValid && (!fetchFromShuffleSvcEnabled || !storageLevel.useDisk)) {
val hadCachedBlocks = exec.cachedBlocks.nonEmpty
val blocks = exec.cachedBlocks.getOrElseUpdate(blockId.rddId,
new mutable.BitSet(blockId.splitIndex))
blocks += blockId.splitIndex
if (!hadCachedBlocks) {
exec.updateTimeout()
}
} else {
exec.cachedBlocks.get(blockId.rddId).foreach { blocks =>
blocks -= blockId.splitIndex
if (blocks.isEmpty) {
exec.cachedBlocks -= blockId.rddId
if (exec.cachedBlocks.isEmpty) {
exec.updateTimeout()
}
}
}
}
}
override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
executors.values().asScala.foreach { exec =>
exec.cachedBlocks -= event.rddId
if (exec.cachedBlocks.isEmpty) {
exec.updateTimeout()
}
}
}
// Visible for testing.
private[dynalloc] def isExecutorIdle(id: String): Boolean = {
Option(executors.get(id)).map(_.isIdle).getOrElse(throw new NoSuchElementException(id))
}
// Visible for testing
private[dynalloc] def timedOutExecutors(when: Long): Seq[String] = {
executors.asScala.flatMap { case (id, tracker) =>
if (tracker.timeoutAt <= when) Some(id) else None
}.toSeq
}
// Visible for testing
def executorsPendingToRemove(): Set[String] = {
executors.asScala.filter { case (_, exec) => exec.pendingRemoval }.keys.toSet
}
/**
* This method should be used when updating executor state. It guards against a race condition in
* which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded`
* event, which is possible because these events are posted in different threads. (see SPARK-4951)
*/
private def ensureExecutorIsTracked(id: String): Tracker = {
executors.computeIfAbsent(id, _ => new Tracker())
}
private def updateNextTimeout(newValue: Long): Unit = {
while (true) {
val current = nextTimeout.get()
if (newValue >= current || nextTimeout.compareAndSet(current, newValue)) {
return
}
}
}
private class Tracker {
@volatile var timeoutAt: Long = Long.MaxValue
// Tracks whether this executor is thought to be timed out. It's used to detect when the list
// of timed out executors needs to be updated due to the executor's state changing.
@volatile var timedOut: Boolean = false
var pendingRemoval: Boolean = false
private var idleStart: Long = -1
private var runningTasks: Int = 0
// Maps RDD IDs to the partition IDs stored in the executor.
// This should only be used in the event thread.
val cachedBlocks = new mutable.HashMap[Int, mutable.BitSet]()
// For testing.
def isIdle: Boolean = idleStart >= 0
def updateRunningTasks(delta: Int): Unit = {
runningTasks = math.max(0, runningTasks + delta)
idleStart = if (runningTasks == 0) clock.getTimeMillis() else -1L
updateTimeout()
}
def updateTimeout(): Unit = {
val oldDeadline = timeoutAt
val newDeadline = if (idleStart >= 0) {
idleStart + (if (cachedBlocks.nonEmpty) storageTimeoutMs else idleTimeoutMs)
} else {
Long.MaxValue
}
timeoutAt = newDeadline
// If the executor was thought to be timed out, but the new deadline is later than the
// old one, ask the EAM thread to update the list of timed out executors.
if (newDeadline > oldDeadline && timedOut) {
nextTimeout.set(Long.MinValue)
} else {
updateNextTimeout(newDeadline)
}
}
}
}

View file

@ -222,15 +222,6 @@ class BlockManagerMaster(
timeout.awaitResult(future)
}
/**
* Find out if the executor has cached blocks which are not available via the external shuffle
* service.
* This method does not consider broadcast blocks, since they are not reported to the master.
*/
def hasExclusiveCachedBlocks(executorId: String): Boolean = {
driverEndpoint.askSync[Boolean](HasExclusiveCachedBlocks(executorId))
}
/** Stop the driver endpoint, called only on the Spark driver node */
def stop() {
if (driverEndpoint != null && isDriver) {

View file

@ -147,18 +147,6 @@ class BlockManagerMasterEndpoint(
case BlockManagerHeartbeat(blockManagerId) =>
context.reply(heartbeatReceived(blockManagerId))
case HasExclusiveCachedBlocks(executorId) =>
blockManagerIdByExecutor.get(executorId) match {
case Some(bm) =>
if (blockManagerInfo.contains(bm)) {
val bmInfo = blockManagerInfo(bm)
context.reply(bmInfo.exclusiveCachedBlocks.nonEmpty)
} else {
context.reply(false)
}
case None => context.reply(false)
}
}
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
@ -589,12 +577,6 @@ private[spark] class BlockManagerInfo(
// Mapping from block id to its status.
private val _blocks = new JHashMap[BlockId, BlockStatus]
/**
* Cached blocks which are not available via the external shuffle service.
* This does not include broadcast blocks.
*/
private val _exclusiveCachedBlocks = new mutable.HashSet[BlockId]
def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.get(blockId))
def updateLastSeenMs() {
@ -662,15 +644,6 @@ private[spark] class BlockManagerInfo(
}
}
if (!blockId.isBroadcast) {
if (!externalShuffleServiceEnabled || !storageLevel.useDisk) {
_exclusiveCachedBlocks += blockId
} else if (blockExists) {
// removing block from the exclusive cached blocks when updated to non-exclusive
_exclusiveCachedBlocks -= blockId
}
}
externalShuffleServiceBlockStatus.foreach { shuffleServiceBlocks =>
if (!blockId.isBroadcast && blockStatus.diskSize > 0) {
shuffleServiceBlocks.put(blockId, blockStatus)
@ -679,7 +652,6 @@ private[spark] class BlockManagerInfo(
} else if (blockExists) {
// If isValid is not true, drop the block.
_blocks.remove(blockId)
_exclusiveCachedBlocks -= blockId
externalShuffleServiceBlockStatus.foreach { blockStatus =>
blockStatus.remove(blockId)
}
@ -703,7 +675,6 @@ private[spark] class BlockManagerInfo(
blockStatus.remove(blockId)
}
}
_exclusiveCachedBlocks -= blockId
}
def remainingMem: Long = _remainingMem
@ -712,8 +683,6 @@ private[spark] class BlockManagerInfo(
def blocks: JHashMap[BlockId, BlockStatus] = _blocks
def exclusiveCachedBlocks: collection.Set[BlockId] = _exclusiveCachedBlocks
override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
def clear() {

View file

@ -122,8 +122,5 @@ private[spark] object BlockManagerMessages {
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
case class HasExclusiveCachedBlocks(executorId: String) extends ToBlockManagerMaster
case class IsExecutorAlive(executorId: String) extends ToBlockManagerMaster
}

View file

@ -1,4 +1,3 @@
org.apache.spark.scheduler.DummyExternalClusterManager
org.apache.spark.scheduler.MockExternalClusterManager
org.apache.spark.DummyLocalExternalClusterManager
org.apache.spark.scheduler.CSMockExternalClusterManager

View file

@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
import com.google.common.io.CharStreams
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver}
import org.apache.spark.network.shuffle.TestShuffleDataContext
import org.apache.spark.util.Utils
@ -70,10 +71,12 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite {
}
}
def shuffleServiceConf: SparkConf = sparkConf.clone().set(SHUFFLE_SERVICE_PORT, 0)
def registerExecutor(): Unit = {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "true")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
externalShuffleService = new ExternalShuffleService(shuffleServiceConf, securityManager)
// external Shuffle Service start
externalShuffleService.start()
@ -94,7 +97,7 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite {
"shuffle service restart") {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "true")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
externalShuffleService = new ExternalShuffleService(shuffleServiceConf, securityManager)
// externalShuffleService restart
externalShuffleService.start()
blockHandler = externalShuffleService.getBlockHandler
@ -120,7 +123,7 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite {
" shuffle service restart") {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "false")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
externalShuffleService = new ExternalShuffleService(shuffleServiceConf, securityManager)
// externalShuffleService restart
externalShuffleService.start()
blockHandler = externalShuffleService.getBlockHandler

View file

@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.dynalloc
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, when}
import org.apache.spark._
import org.apache.spark.internal.config._
import org.apache.spark.scheduler._
import org.apache.spark.storage._
import org.apache.spark.util.ManualClock
class ExecutorMonitorSuite extends SparkFunSuite {
private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(60L)
private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(120L)
private val conf = new SparkConf()
.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "60s")
.set(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key, "120s")
private var monitor: ExecutorMonitor = _
private var client: ExecutorAllocationClient = _
private var clock: ManualClock = _
// List of known executors. Allows easily mocking which executors are alive without
// having to use mockito APIs directly in each test.
private val knownExecs = mutable.HashSet[String]()
override def beforeEach(): Unit = {
super.beforeEach()
knownExecs.clear()
clock = new ManualClock()
client = mock(classOf[ExecutorAllocationClient])
when(client.isExecutorActive(any())).thenAnswer { invocation =>
knownExecs.contains(invocation.getArguments()(0).asInstanceOf[String])
}
monitor = new ExecutorMonitor(conf, client, clock)
}
test("basic executor timeout") {
knownExecs += "1"
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
assert(monitor.executorCount === 1)
assert(monitor.isExecutorIdle("1"))
assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
}
test("SPARK-4951, SPARK-26927: handle out of order task start events") {
knownExecs ++= Set("1", "2")
monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1)))
assert(monitor.executorCount === 1)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
assert(monitor.executorCount === 1)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
assert(monitor.executorCount === 2)
monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), "2", null))
assert(monitor.executorCount === 1)
knownExecs -= "2"
monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("2", 2)))
assert(monitor.executorCount === 1)
}
test("track tasks running on executor") {
knownExecs += "1"
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1)))
assert(!monitor.isExecutorIdle("1"))
// Start/end a few tasks and make sure the executor does not go idle.
(2 to 10).foreach { i =>
monitor.onTaskStart(SparkListenerTaskStart(i, 1, taskInfo("1", 1)))
assert(!monitor.isExecutorIdle("1"))
monitor.onTaskEnd(SparkListenerTaskEnd(i, 1, "foo", Success, taskInfo("1", 1), null))
assert(!monitor.isExecutorIdle("1"))
}
monitor.onTaskEnd(SparkListenerTaskEnd(1, 1, "foo", Success, taskInfo("1", 1), null))
assert(monitor.isExecutorIdle("1"))
assert(monitor.timedOutExecutors(clock.getTimeMillis()).isEmpty)
assert(monitor.timedOutExecutors(clock.getTimeMillis() + idleTimeoutMs + 1) === Seq("1"))
}
test("use appropriate time out depending on whether blocks are stored") {
knownExecs += "1"
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
assert(monitor.isExecutorIdle("1"))
assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
assert(monitor.isExecutorIdle("1"))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.NONE))
assert(monitor.isExecutorIdle("1"))
assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1)))
assert(!monitor.isExecutorIdle("1"))
monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
assert(!monitor.isExecutorIdle("1"))
monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.NONE))
assert(!monitor.isExecutorIdle("1"))
}
test("keeps track of stored blocks for each rdd and split") {
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
monitor.onBlockUpdated(rddUpdate(1, 1, "1"))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
monitor.onBlockUpdated(rddUpdate(2, 0, "1"))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = StorageLevel.NONE))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
monitor.onUnpersistRDD(SparkListenerUnpersistRDD(1))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
// Make sure that if we get an unpersist event much later, which moves an executor from having
// cached blocks to no longer having cached blocks, it will time out based on the time it
// originally went idle.
clock.setTime(idleDeadline)
monitor.onUnpersistRDD(SparkListenerUnpersistRDD(2))
assert(monitor.timedOutExecutors(clock.getTimeMillis()) === Seq("1"))
}
test("handle timeouts correctly with multiple executors") {
knownExecs ++= Set("1", "2", "3")
// start exec 1 at 0s (should idle time out at 60s)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
assert(monitor.isExecutorIdle("1"))
// start exec 2 at 30s, store a block (should idle time out at 150s)
clock.setTime(TimeUnit.SECONDS.toMillis(30))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
monitor.onBlockUpdated(rddUpdate(1, 0, "2"))
assert(monitor.isExecutorIdle("2"))
assert(!monitor.timedOutExecutors(idleDeadline).contains("2"))
// start exec 3 at 60s (should idle timeout at 120s, exec 1 should time out)
clock.setTime(TimeUnit.SECONDS.toMillis(60))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", null))
assert(monitor.timedOutExecutors(clock.getTimeMillis()) === Seq("1"))
// store block on exec 3 (should now idle time out at 180s)
monitor.onBlockUpdated(rddUpdate(1, 0, "3"))
assert(monitor.isExecutorIdle("3"))
assert(!monitor.timedOutExecutors(idleDeadline).contains("3"))
// advance to 140s, remove block from exec 3 (time out immediately)
clock.setTime(TimeUnit.SECONDS.toMillis(140))
monitor.onBlockUpdated(rddUpdate(1, 0, "3", level = StorageLevel.NONE))
assert(monitor.timedOutExecutors(clock.getTimeMillis()).toSet === Set("1", "3"))
// advance to 150s, now exec 2 should time out
clock.setTime(TimeUnit.SECONDS.toMillis(150))
assert(monitor.timedOutExecutors(clock.getTimeMillis()).toSet === Set("1", "2", "3"))
}
test("SPARK-27677: don't track blocks stored on disk when using shuffle service") {
// First make sure that blocks on disk are counted when no shuffle service is available.
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.DISK_ONLY))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
conf.set(SHUFFLE_SERVICE_ENABLED, true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
monitor = new ExecutorMonitor(conf, client, clock)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.MEMORY_ONLY))
monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = StorageLevel.MEMORY_ONLY))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.DISK_ONLY))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = StorageLevel.DISK_ONLY))
assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
// Tag the block as being both in memory and on disk, which may happen after it was
// evicted and then restored into memory. Since it's still on disk the executor should
// still be eligible for removal.
monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = StorageLevel.MEMORY_AND_DISK))
assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
}
test("track executors pending for removal") {
knownExecs ++= Set("1", "2", "3")
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", null))
clock.setTime(idleDeadline)
assert(monitor.timedOutExecutors().toSet === Set("1", "2", "3"))
assert(monitor.pendingRemovalCount === 0)
// Notify that only a subset of executors was killed, to mimic the case where the scheduler
// refuses to kill an executor that is busy for whatever reason the monitor hasn't detected yet.
monitor.executorsKilled(Seq("1"))
assert(monitor.timedOutExecutors().toSet === Set("2", "3"))
assert(monitor.pendingRemovalCount === 1)
// Check the timed out executors again so that we're sure they're still timed out when no
// events happen. This ensures that the monitor doesn't lose track of them.
assert(monitor.timedOutExecutors().toSet === Set("2", "3"))
monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("2", 1)))
assert(monitor.timedOutExecutors().toSet === Set("3"))
monitor.executorsKilled(Seq("3"))
assert(monitor.pendingRemovalCount === 2)
monitor.onTaskEnd(SparkListenerTaskEnd(1, 1, "foo", Success, taskInfo("2", 1), null))
assert(monitor.timedOutExecutors().isEmpty)
clock.advance(idleDeadline)
assert(monitor.timedOutExecutors().toSet === Set("2"))
}
private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1
private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1
private def taskInfo(
execId: String,
id: Int,
speculative: Boolean = false,
duration: Long = -1L): TaskInfo = {
val start = if (duration > 0) clock.getTimeMillis() - duration else clock.getTimeMillis()
val task = new TaskInfo(id, id, 1, start, execId, "foo.example.com",
TaskLocality.PROCESS_LOCAL, speculative)
if (duration > 0) {
task.markFinished(TaskState.FINISHED, math.max(1, clock.getTimeMillis()))
}
task
}
private def rddUpdate(
rddId: Int,
splitIndex: Int,
execId: String,
level: StorageLevel = StorageLevel.MEMORY_ONLY): SparkListenerBlockUpdated = {
SparkListenerBlockUpdated(
BlockUpdatedInfo(BlockManagerId(execId, "1.example.com", 42),
RDDBlockId(rddId, splitIndex), level, 1L, 0L))
}
}

View file

@ -47,7 +47,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
broadcastId, StorageLevel.MEMORY_AND_DISK, memSize = 200, diskSize = 100)
assert(bmInfo.blocks.asScala ===
Map(broadcastId -> BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 100)))
assert(bmInfo.exclusiveCachedBlocks.isEmpty)
assert(bmInfo.remainingMem === 29800)
}
@ -56,7 +55,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
bmInfo.updateBlockInfo(rddId, StorageLevel.MEMORY_ONLY, memSize = 200, diskSize = 0)
assert(bmInfo.blocks.asScala ===
Map(rddId -> BlockStatus(StorageLevel.MEMORY_ONLY, 200, 0)))
assert(bmInfo.exclusiveCachedBlocks === Set(rddId))
assert(bmInfo.remainingMem === 29800)
if (svcEnabled) {
assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
@ -70,8 +68,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
bmInfo.updateBlockInfo(rddId, StorageLevel.MEMORY_AND_DISK, memSize = 200, diskSize = 400)
assert(bmInfo.blocks.asScala ===
Map(rddId -> BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 400)))
val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else Set(rddId)
assert(bmInfo.exclusiveCachedBlocks === exclusiveCachedBlocksForOneMemoryOnly)
assert(bmInfo.remainingMem === 29800)
if (svcEnabled) {
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
@ -85,7 +81,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
assert(bmInfo.blocks.asScala ===
Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else Set(rddId)
assert(bmInfo.exclusiveCachedBlocks === exclusiveCachedBlocksForOneMemoryOnly)
assert(bmInfo.remainingMem === 30000)
if (svcEnabled) {
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
@ -99,7 +94,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
val rddId: BlockId = RDDBlockId(0, 0)
bmInfo.updateBlockInfo(rddId, StorageLevel.MEMORY_ONLY, memSize = 200, 0)
assert(bmInfo.blocks.asScala === Map(rddId -> BlockStatus(StorageLevel.MEMORY_ONLY, 200, 0)))
assert(bmInfo.exclusiveCachedBlocks === Set(rddId))
assert(bmInfo.remainingMem === 29800)
if (svcEnabled) {
assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
@ -107,8 +101,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, diskSize = 200)
assert(bmInfo.blocks.asScala === Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
val exclusiveCachedBlocksForNoMemoryOnly = if (svcEnabled) Set() else Set(rddId)
assert(bmInfo.exclusiveCachedBlocks === exclusiveCachedBlocksForNoMemoryOnly)
assert(bmInfo.remainingMem === 30000)
if (svcEnabled) {
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
@ -120,8 +112,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
val rddId: BlockId = RDDBlockId(0, 0)
bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, diskSize = 200)
assert(bmInfo.blocks.asScala === Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else Set(rddId)
assert(bmInfo.exclusiveCachedBlocks === exclusiveCachedBlocksForOneMemoryOnly)
assert(bmInfo.remainingMem === 30000)
if (svcEnabled) {
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
@ -130,7 +120,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
bmInfo.updateBlockInfo(rddId, StorageLevel.NONE, memSize = 0, diskSize = 200)
assert(bmInfo.blocks.isEmpty)
assert(bmInfo.exclusiveCachedBlocks.isEmpty)
assert(bmInfo.remainingMem === 30000)
if (svcEnabled) {
assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
@ -141,8 +130,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
val rddId: BlockId = RDDBlockId(0, 0)
bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, diskSize = 200)
assert(bmInfo.blocks.asScala === Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else Set(rddId)
assert(bmInfo.exclusiveCachedBlocks === exclusiveCachedBlocksForOneMemoryOnly)
assert(bmInfo.remainingMem === 30000)
if (svcEnabled) {
assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
@ -151,7 +138,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
bmInfo.removeBlock(rddId)
assert(bmInfo.blocks.asScala.isEmpty)
assert(bmInfo.exclusiveCachedBlocks.isEmpty)
assert(bmInfo.remainingMem === 30000)
if (svcEnabled) {
assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)

View file

@ -388,7 +388,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
}
private def withStreamingContext(conf: SparkConf)(body: StreamingContext => Unit): Unit = {
conf.setMaster("myDummyLocalExternalClusterManager")
conf.setMaster("local-cluster[1,1,1024]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.dynamicAllocation.testing", "true") // to test dynamic allocation