Revert "[SPARK-35011][CORE] Avoid Block Manager registrations when StopExecutor msg is in-flight"

This reverts commit b9e53f8937.

### What changes were proposed in this pull request?

Revert https://github.com/apache/spark/pull/32114

### Why are the changes needed?

It breaks the expected `BlockManager` re-registration (e.g., heartbeat loss of an active executor) due to deferred removal of `BlockManager`, see the check:
9cefde8db3/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala (L551)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass existing tests.

Closes #33959 from Ngone51/revert-35011-3.2.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
yi.wu 2021-09-10 09:29:55 -07:00 committed by Dongjoon Hyun
parent b52fbeee2d
commit dcb57ab42b
4 changed files with 40 additions and 109 deletions

View file

@ -80,7 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// executor ID -> timestamp of when the last heartbeat from this executor was received // executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new HashMap[String, Long] private val executorLastSeen = new HashMap[String, Long]
private val executorTimeoutMs = Utils.executorTimeoutMs(sc.conf) private val executorTimeoutMs = sc.conf.get(
config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))
private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL) private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)

View file

@ -96,15 +96,6 @@ class BlockManagerMasterEndpoint(
mapper mapper
} }
private val executorTimeoutMs = Utils.executorTimeoutMs(conf)
private val blockManagerInfoCleaner = {
val cleaningDelay = Math.floorDiv(executorTimeoutMs, 2L)
val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("blockManagerInfo-cleaner")
executor.scheduleWithFixedDelay(() => cleanBlockManagerInfo(), cleaningDelay, cleaningDelay,
TimeUnit.MILLISECONDS)
executor
}
val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE) val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE)
val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf) val defaultRpcTimeout = RpcUtils.askRpcTimeout(conf)
@ -282,12 +273,12 @@ class BlockManagerMasterEndpoint(
} }
} }
bmIdsExecutor.foreach { bmId => bmIdsExecutor.foreach { bmId =>
aliveBlockManagerInfo(bmId).foreach { bmInfo => blockManagerInfo.get(bmId).foreach { bmInfo =>
bmInfo.removeBlock(blockId) bmInfo.removeBlock(blockId)
} }
} }
} }
val removeRddFromExecutorsFutures = allAliveBlockManagerInfos.map { bmInfo => val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
bmInfo.storageEndpoint.ask[Int](removeMsg).recover { bmInfo.storageEndpoint.ask[Int](removeMsg).recover {
// use 0 as default value means no blocks were removed // use 0 as default value means no blocks were removed
handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0) handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0)
@ -314,7 +305,7 @@ class BlockManagerMasterEndpoint(
// Nothing to do in the BlockManagerMasterEndpoint data structures // Nothing to do in the BlockManagerMasterEndpoint data structures
val removeMsg = RemoveShuffle(shuffleId) val removeMsg = RemoveShuffle(shuffleId)
Future.sequence( Future.sequence(
allAliveBlockManagerInfos.map { bm => blockManagerInfo.values.map { bm =>
bm.storageEndpoint.ask[Boolean](removeMsg).recover { bm.storageEndpoint.ask[Boolean](removeMsg).recover {
// use false as default value means no shuffle data were removed // use false as default value means no shuffle data were removed
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
@ -330,7 +321,7 @@ class BlockManagerMasterEndpoint(
*/ */
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = { private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver) val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = allAliveBlockManagerInfos.filter { info => val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || !info.blockManagerId.isDriver removeFromDriver || !info.blockManagerId.isDriver
} }
val futures = requiredBlockManagers.map { bm => val futures = requiredBlockManagers.map { bm =>
@ -346,24 +337,13 @@ class BlockManagerMasterEndpoint(
private def removeBlockManager(blockManagerId: BlockManagerId): Unit = { private def removeBlockManager(blockManagerId: BlockManagerId): Unit = {
val info = blockManagerInfo(blockManagerId) val info = blockManagerInfo(blockManagerId)
// SPARK-35011: Not removing info from the blockManagerInfo map, but only setting the removal
// timestamp of the executor in BlockManagerInfo. This info will be removed from
// blockManagerInfo map by the blockManagerInfoCleaner once
// now() - info.executorRemovalTs > executorTimeoutMs.
//
// We are delaying the removal of BlockManagerInfo to avoid a BlockManager reregistration
// while a executor is shutting. This unwanted reregistration causes inconsistent bookkeeping
// of executors in Spark.
// Delaying this removal until blockManagerInfoCleaner decides to remove it ensures
// BlockManagerMasterHeartbeatEndpoint does not ask the BlockManager on a recently removed
// executor to reregister on BlockManagerHeartbeat message.
info.setExecutorRemovalTs()
// Remove the block manager from blockManagerIdByExecutor. // Remove the block manager from blockManagerIdByExecutor.
blockManagerIdByExecutor -= blockManagerId.executorId blockManagerIdByExecutor -= blockManagerId.executorId
decommissioningBlockManagerSet.remove(blockManagerId) decommissioningBlockManagerSet.remove(blockManagerId)
// remove all the blocks. // Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
val iterator = info.blocks.keySet.iterator val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) { while (iterator.hasNext) {
val blockId = iterator.next val blockId = iterator.next
@ -384,7 +364,7 @@ class BlockManagerMasterEndpoint(
val i = (new Random(blockId.hashCode)).nextInt(locations.size) val i = (new Random(blockId.hashCode)).nextInt(locations.size)
val blockLocations = locations.toSeq val blockLocations = locations.toSeq
val candidateBMId = blockLocations(i) val candidateBMId = blockLocations(i)
aliveBlockManagerInfo(candidateBMId).foreach { bm => blockManagerInfo.get(candidateBMId).foreach { bm =>
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId) val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
bm.storageEndpoint.ask[Boolean](replicateMsg) bm.storageEndpoint.ask[Boolean](replicateMsg)
@ -420,7 +400,8 @@ class BlockManagerMasterEndpoint(
*/ */
private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
try { try {
aliveBlockManagerInfo(blockManagerId).map { info => val info = blockManagerInfo(blockManagerId)
val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD) val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
rddBlocks.map { blockId => rddBlocks.map { blockId =>
val currentBlockLocations = blockLocations.get(blockId) val currentBlockLocations = blockLocations.get(blockId)
@ -429,7 +410,6 @@ class BlockManagerMasterEndpoint(
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
replicateMsg replicateMsg
}.toSeq }.toSeq
}.getOrElse(Seq.empty[ReplicateBlock])
} catch { } catch {
// If the block manager has already exited, nothing to replicate. // If the block manager has already exited, nothing to replicate.
case e: java.util.NoSuchElementException => case e: java.util.NoSuchElementException =>
@ -443,7 +423,8 @@ class BlockManagerMasterEndpoint(
val locations = blockLocations.get(blockId) val locations = blockLocations.get(blockId)
if (locations != null) { if (locations != null) {
locations.foreach { blockManagerId: BlockManagerId => locations.foreach { blockManagerId: BlockManagerId =>
aliveBlockManagerInfo(blockManagerId).foreach { bm => val blockManager = blockManagerInfo.get(blockManagerId)
blockManager.foreach { bm =>
// Remove the block from the BlockManager. // Remove the block from the BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost. // Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here. // If message loss becomes frequent, we should add retry logic here.
@ -458,14 +439,14 @@ class BlockManagerMasterEndpoint(
// Return a map from the block manager id to max memory and remaining memory. // Return a map from the block manager id to max memory and remaining memory.
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = { private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
allAliveBlockManagerInfos.map { info => blockManagerInfo.map { case(blockManagerId, info) =>
(info.blockManagerId, (info.maxMem, info.remainingMem)) (blockManagerId, (info.maxMem, info.remainingMem))
}.toMap }.toMap
} }
private def storageStatus: Array[StorageStatus] = { private def storageStatus: Array[StorageStatus] = {
allAliveBlockManagerInfos.map { info => blockManagerInfo.map { case (blockManagerId, info) =>
new StorageStatus(info.blockManagerId, info.maxMem, Some(info.maxOnHeapMem), new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
Some(info.maxOffHeapMem), info.blocks.asScala) Some(info.maxOffHeapMem), info.blocks.asScala)
}.toArray }.toArray
} }
@ -487,7 +468,7 @@ class BlockManagerMasterEndpoint(
* Futures to avoid potential deadlocks. This can arise if there exists a block manager * Futures to avoid potential deadlocks. This can arise if there exists a block manager
* that is also waiting for this master endpoint's response to a previous message. * that is also waiting for this master endpoint's response to a previous message.
*/ */
allAliveBlockManagerInfos.map { info => blockManagerInfo.values.map { info =>
val blockStatusFuture = val blockStatusFuture =
if (askStorageEndpoints) { if (askStorageEndpoints) {
info.storageEndpoint.ask[Option[BlockStatus]](getBlockStatus) info.storageEndpoint.ask[Option[BlockStatus]](getBlockStatus)
@ -511,7 +492,7 @@ class BlockManagerMasterEndpoint(
askStorageEndpoints: Boolean): Future[Seq[BlockId]] = { askStorageEndpoints: Boolean): Future[Seq[BlockId]] = {
val getMatchingBlockIds = GetMatchingBlockIds(filter) val getMatchingBlockIds = GetMatchingBlockIds(filter)
Future.sequence( Future.sequence(
allAliveBlockManagerInfos.map { info => blockManagerInfo.values.map { info =>
val future = val future =
if (askStorageEndpoints) { if (askStorageEndpoints) {
info.storageEndpoint.ask[Seq[BlockId]](getMatchingBlockIds) info.storageEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
@ -581,10 +562,9 @@ class BlockManagerMasterEndpoint(
if (pushBasedShuffleEnabled) { if (pushBasedShuffleEnabled) {
addMergerLocation(id) addMergerLocation(id)
} }
listenerBus.post(SparkListenerBlockManagerAdded(time, id,
maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
} }
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
id id
} }
@ -672,7 +652,7 @@ class BlockManagerMasterEndpoint(
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) { if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
blockStatusByShuffleService.get(bmId).flatMap(m => m.get(blockId)) blockStatusByShuffleService.get(bmId).flatMap(m => m.get(blockId))
} else { } else {
aliveBlockManagerInfo(bmId).flatMap(_.getStatus(blockId)) blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId))
} }
} }
@ -683,7 +663,8 @@ class BlockManagerMasterEndpoint(
// can be used to access this block even when the original executor is already stopped. // can be used to access this block even when the original executor is already stopped.
loc.host == requesterHost && loc.host == requesterHost &&
(loc.port == externalShuffleServicePort || (loc.port == externalShuffleServicePort ||
aliveBlockManagerInfo(loc) blockManagerInfo
.get(loc)
.flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk)) .flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk))
.getOrElse(false)) .getOrElse(false))
}.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) } }.flatMap { bmId => Option(executorIdToLocalDirs.getIfPresent(bmId.executorId)) }
@ -700,7 +681,7 @@ class BlockManagerMasterEndpoint(
/** Get the list of the peers of the given block manager */ /** Get the list of the peers of the given block manager */
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = allAliveBlockManagerInfos.map(_.blockManagerId).toSet val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) { if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds blockManagerIds
.filterNot { _.isDriver } .filterNot { _.isDriver }
@ -753,7 +734,7 @@ class BlockManagerMasterEndpoint(
private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
for ( for (
blockManagerId <- blockManagerIdByExecutor.get(executorId); blockManagerId <- blockManagerIdByExecutor.get(executorId);
info <- aliveBlockManagerInfo(blockManagerId) info <- blockManagerInfo.get(blockManagerId)
) yield { ) yield {
info.storageEndpoint info.storageEndpoint
} }
@ -761,27 +742,7 @@ class BlockManagerMasterEndpoint(
override def onStop(): Unit = { override def onStop(): Unit = {
askThreadPool.shutdownNow() askThreadPool.shutdownNow()
blockManagerInfoCleaner.shutdownNow()
} }
private def cleanBlockManagerInfo(): Unit = {
logDebug("Cleaning blockManagerInfo")
val now = System.currentTimeMillis()
val expiredBmIds = blockManagerInfo.filter { case (_, bmInfo) =>
// bmInfo.executorRemovalTs.get cannot be None when BM is not alive
!bmInfo.isAlive && (now - bmInfo.executorRemovalTs.get) > executorTimeoutMs
}.keys
expiredBmIds.foreach { bmId =>
logInfo(s"Cleaning expired $bmId from blockManagerInfo")
blockManagerInfo.remove(bmId)
}
}
@inline private def aliveBlockManagerInfo(bmId: BlockManagerId): Option[BlockManagerInfo] =
blockManagerInfo.get(bmId).filter(_.isAlive)
@inline private def allAliveBlockManagerInfos: Iterable[BlockManagerInfo] =
blockManagerInfo.values.filter(_.isAlive)
} }
@DeveloperApi @DeveloperApi
@ -834,7 +795,6 @@ private[spark] class BlockManagerInfo(
private var _lastSeenMs: Long = timeMs private var _lastSeenMs: Long = timeMs
private var _remainingMem: Long = maxMem private var _remainingMem: Long = maxMem
private var _executorRemovalTs: Option[Long] = None
// Mapping from block id to its status. // Mapping from block id to its status.
private val _blocks = new JHashMap[BlockId, BlockStatus] private val _blocks = new JHashMap[BlockId, BlockStatus]
@ -949,16 +909,4 @@ private[spark] class BlockManagerInfo(
def clear(): Unit = { def clear(): Unit = {
_blocks.clear() _blocks.clear()
} }
def executorRemovalTs: Option[Long] = _executorRemovalTs
def isAlive: Boolean = _executorRemovalTs.isEmpty
def setExecutorRemovalTs(): Unit = {
if (!isAlive) {
logWarning(s"executorRemovalTs is already set to ${_executorRemovalTs.get}")
} else {
_executorRemovalTs = Some(System.currentTimeMillis())
}
}
} }

View file

@ -3090,13 +3090,6 @@ private[spark] object Utils extends Logging {
} }
} }
def executorTimeoutMs(conf: SparkConf): Long = {
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
.getOrElse(Utils.timeStringAsMs(s"${conf.get(Network.NETWORK_TIMEOUT)}s"))
}
/** Returns a string message about delegation token generation failure */ /** Returns a string message about delegation token generation failure */
def createFailedToGetTokenMessage(serviceName: String, e: scala.Throwable): String = { def createFailedToGetTokenMessage(serviceName: String, e: scala.Throwable): String = {
val message = "Failed to get token from service %s due to %s. " + val message = "Failed to get token from service %s due to %s. " +

View file

@ -26,7 +26,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Future, TimeoutException} import scala.concurrent.{Future, TimeoutException}
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps} import scala.language.implicitConversions
import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.apache.commons.lang3.RandomUtils import org.apache.commons.lang3.RandomUtils
@ -101,7 +101,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
.set(Network.RPC_ASK_TIMEOUT, "5s") .set(Network.RPC_ASK_TIMEOUT, "5s")
.set(PUSH_BASED_SHUFFLE_ENABLED, true) .set(PUSH_BASED_SHUFFLE_ENABLED, true)
.set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "5s")
} }
private def makeSortShuffleManager(conf: Option[SparkConf] = None): SortShuffleManager = { private def makeSortShuffleManager(conf: Option[SparkConf] = None): SortShuffleManager = {
@ -611,7 +610,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt()) mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
} }
test("no reregistration on heart beat until executor timeout") { test("reregistration on heart beat") {
val store = makeBlockManager(2000) val store = makeBlockManager(2000)
val a1 = new Array[Byte](400) val a1 = new Array[Byte](400)
@ -622,15 +621,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeExecutor(store.blockManagerId.executorId) master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master") assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
BlockManagerHeartbeat(store.blockManagerId))
assert(reregister == false, "master told to re-register")
eventually(timeout(10 seconds), interval(1 seconds)) {
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean]( val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
BlockManagerHeartbeat(store.blockManagerId)) BlockManagerHeartbeat(store.blockManagerId))
assert(reregister, "master did not tell to re-register") assert(reregister)
}
} }
test("reregistration on block update") { test("reregistration on block update") {
@ -644,12 +638,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeExecutor(store.blockManagerId.executorId) master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master") assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
eventually(timeout(10 seconds), interval(1 seconds)) {
val reregister = !master.driverHeartbeatEndPoint.askSync[Boolean](
BlockManagerHeartbeat(store.blockManagerId))
assert(reregister, "master did not tell to re-register")
}
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister() store.waitForAsyncReregister()