[SPARK-18640] Add synchronization to TaskScheduler.runningTasksByExecutors
## What changes were proposed in this pull request? The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method. This patch fixes both issues. ## How was this patch tested? Covered by existing tests. Author: Josh Rosen <joshrosen@databricks.com> Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety.
This commit is contained in:
parent
bc95ea0be5
commit
c51c772594
|
@ -112,7 +112,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
|
|||
*/
|
||||
def getExecutorInfos: Array[SparkExecutorInfo] = {
|
||||
val executorIdToRunningTasks: Map[String, Int] =
|
||||
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors()
|
||||
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors
|
||||
|
||||
sc.getExecutorStorageStatus.map { status =>
|
||||
val bmId = status.blockManagerId
|
||||
|
|
|
@ -96,7 +96,7 @@ private[spark] class TaskSchedulerImpl(
|
|||
// IDs of the tasks running on each executor
|
||||
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
|
||||
|
||||
def runningTasksByExecutors(): Map[String, Int] = {
|
||||
def runningTasksByExecutors: Map[String, Int] = synchronized {
|
||||
executorIdToRunningTaskIds.toMap.mapValues(_.size)
|
||||
}
|
||||
|
||||
|
|
|
@ -678,7 +678,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
|
|||
// Check that state associated with the lost task attempt is cleaned up:
|
||||
assert(taskScheduler.taskIdToExecutorId.isEmpty)
|
||||
assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
|
||||
assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)
|
||||
assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
|
||||
}
|
||||
|
||||
test("if a task finishes with TaskState.LOST its executor is marked as dead") {
|
||||
|
@ -709,7 +709,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
|
|||
// Check that state associated with the lost task attempt is cleaned up:
|
||||
assert(taskScheduler.taskIdToExecutorId.isEmpty)
|
||||
assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
|
||||
assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)
|
||||
assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
|
||||
|
||||
// Check that the executor has been marked as dead
|
||||
assert(!taskScheduler.isExecutorAlive("executor0"))
|
||||
|
|
Loading…
Reference in a new issue