[SPARK-30786][CORE] Fix Block replication failure propogation issue in BlockManager

### What changes were proposed in this pull request?
Currently the uploadBlockSync api in BlockTransferService always succeeds irrespective of whether the BlockManager was able to successfully replicate a block on peer block manager or not. This PR makes sure that the NettyBlockRpcServer invokes onFailure callback when it is not able to replicate the block to itself because of any reason. The onFailure callback makes sure that the BlockTransferService on client side gets the failure and retry replication the Block on some other BlockManager.

### Why are the changes needed?
Currently the Spark Block replication retry logic is not working correctly. It doesn't retry on other Block managers even when replication fails on 1 of the peers.

A user can cache an DataFrame with different replication factor. Ex - df.persist(StorageLevel.MEMORY_ONLY_2) - This will cache each partition at two different BlockManagers. When a DataFrame partition is computed first time, it is firstly stored locally on the local BlockManager and then it is replicated to other block managers based on replication factor config. The replication of block to other block managers might fail because of memory/network etc issues and so there is already provision to retry the replication on some other peer based on "spark.storage.maxReplicationFailures" config, Currently when this replication fails, the client does not know about the failure and so it doesn't retry on other peers. This PR fixes this issue.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Added Unit Test.

Closes #27539 from prakharjain09/bm_replicate.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Prakhar Jain 2020-02-19 20:23:22 +08:00 committed by Wenchen Fan
parent 2ab8d674ba
commit e086951349
3 changed files with 69 additions and 6 deletions

View file

@ -105,8 +105,14 @@ class NettyBlockRpcServer(
val blockId = BlockId(uploadBlock.blockId)
logDebug(s"Receiving replicated block $blockId with level ${level} " +
s"from ${client.getSocketAddress}")
blockManager.putBlockData(blockId, data, level, classTag)
val blockStored = blockManager.putBlockData(blockId, data, level, classTag)
if (blockStored) {
responseContext.onSuccess(ByteBuffer.allocate(0))
} else {
val exception = new Exception(s"Upload block for $blockId failed. This mostly happens " +
s"when there is not sufficient space available to store the block.")
responseContext.onFailure(exception)
}
}
}

View file

@ -666,7 +666,11 @@ private[spark] class BlockManager(
// stream.
channel.close()
val blockSize = channel.getCount
TempFileBasedBlockStoreUpdater(blockId, level, classTag, tmpFile, blockSize).save()
val blockStored = TempFileBasedBlockStoreUpdater(
blockId, level, classTag, tmpFile, blockSize).save()
if (!blockStored) {
throw new Exception(s"Failure while trying to store block $blockId on $blockManagerId.")
}
}
override def onFailure(streamId: String, cause: Throwable): Unit = {

View file

@ -24,7 +24,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.implicitConversions
import org.mockito.Mockito.{mock, when}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doAnswer, mock, spy, when}
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually._
@ -69,11 +70,12 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
protected def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
name: String = SparkContext.DRIVER_IDENTIFIER,
memoryManager: Option[UnifiedMemoryManager] = None): BlockManager = {
conf.set(TEST_MEMORY, maxMem)
conf.set(MEMORY_OFFHEAP_SIZE, maxMem)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, numCores = 1))
val serializerManager = new SerializerManager(serializer, conf)
val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None)
@ -255,6 +257,43 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
}
}
Seq(false, true).foreach { stream =>
test(s"test block replication failures when block is received " +
s"by remote block manager but putBlock fails (stream = $stream)") {
// Retry replication logic for 1 failure
conf.set(STORAGE_MAX_REPLICATION_FAILURE, 1)
// Custom block replication policy which prioritizes BlockManagers as per hostnames
conf.set(STORAGE_REPLICATION_POLICY, classOf[SortOnHostNameBlockReplicationPolicy].getName)
// To use upload block stream flow, set maxRemoteBlockSizeFetchToMem to 0
val maxRemoteBlockSizeFetchToMem = if (stream) 0 else Int.MaxValue - 512
conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, maxRemoteBlockSizeFetchToMem.toLong)
// Create 2 normal block manager
val store1 = makeBlockManager(10000, "host-1")
val store3 = makeBlockManager(10000, "host-3")
// create 1 faulty block manager by injecting faulty memory manager
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val mockedMemoryManager = spy(memManager)
doAnswer(_ => false).when(mockedMemoryManager).acquireStorageMemory(any(), any(), any())
val store2 = makeBlockManager(10000, "host-2", Some(mockedMemoryManager))
assert(master.getPeers(store1.blockManagerId).toSet ===
Set(store2.blockManagerId, store3.blockManagerId))
val blockId = "blockId"
val message = new Array[Byte](1000)
// Replication will be tried by store1 in this order: store2, store3
// store2 is faulty block manager, so it won't be able to put block
// Then store1 will try to replicate block on store3
store1.putSingle(blockId, message, StorageLevel.MEMORY_ONLY_SER_2)
val blockLocations = master.getLocations(blockId).toSet
assert(blockLocations === Set(store1.blockManagerId, store3.blockManagerId))
}
}
test("block replication - addition and deletion of block managers") {
val blockSize = 1000
val storeSize = 10000
@ -509,3 +548,17 @@ class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationB
classOf[DummyTopologyMapper].getName)
}
// BlockReplicationPolicy to prioritize BlockManagers based on hostnames
// Examples - for BM-x(host-2), BM-y(host-1), BM-z(host-3), it will prioritize them as
// BM-y(host-1), BM-x(host-2), BM-z(host-3)
class SortOnHostNameBlockReplicationPolicy
extends BlockReplicationPolicy {
override def prioritize(
blockManagerId: BlockManagerId,
peers: Seq[BlockManagerId],
peersReplicatedTo: mutable.HashSet[BlockManagerId],
blockId: BlockId,
numReplicas: Int): List[BlockManagerId] = {
peers.sortBy(_.host).toList
}
}