[SPARK-24021][CORE] fix bug in BlacklistTracker's updateBlacklistForFetchFailure
## What changes were proposed in this pull request? There‘s a miswrite in BlacklistTracker's updateBlacklistForFetchFailure: ``` val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]()) blacklistedExecsOnNode += exec ``` where first **exec** should be **host**. ## How was this patch tested? adjust existed test. Author: wuyi <ngone_5451@163.com> Closes #21104 from Ngone51/SPARK-24021.
This commit is contained in:
parent
d96c3e33cc
commit
0deaa52513
|
@ -210,7 +210,7 @@ private[scheduler] class BlacklistTracker (
|
|||
updateNextExpiryTime()
|
||||
killBlacklistedExecutor(exec)
|
||||
|
||||
val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]())
|
||||
val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(host, HashSet[String]())
|
||||
blacklistedExecsOnNode += exec
|
||||
}
|
||||
}
|
||||
|
|
|
@ -574,6 +574,9 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
|
|||
verify(allocationClientMock, never).killExecutors(any(), any(), any(), any())
|
||||
verify(allocationClientMock, never).killExecutorsOnHost(any())
|
||||
|
||||
assert(blacklist.nodeToBlacklistedExecs.contains("hostA"))
|
||||
assert(blacklist.nodeToBlacklistedExecs("hostA").contains("1"))
|
||||
|
||||
// Enable auto-kill. Blacklist an executor and make sure killExecutors is called.
|
||||
conf.set(config.BLACKLIST_KILL_ENABLED, true)
|
||||
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
|
||||
|
@ -589,6 +592,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
|
|||
1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
|
||||
assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
|
||||
assert(blacklist.nodeIdToBlacklistExpiryTime.isEmpty)
|
||||
assert(blacklist.nodeToBlacklistedExecs.contains("hostA"))
|
||||
assert(blacklist.nodeToBlacklistedExecs("hostA").contains("1"))
|
||||
|
||||
// Enable external shuffle service to see if all the executors on this node will be killed.
|
||||
conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
|
||||
|
|
Loading…
Reference in a new issue