[SPARK-4064]NioBlockTransferService.fetchBlocks may cause spark to hang.
cc @rxin Author: GuoQiang Li <witgo@qq.com> Closes #2929 from witgo/SPARK-4064 and squashes the following commits: 20110f2 [GuoQiang Li] Modify the exception msg 3425225 [GuoQiang Li] review commits 2b07e49 [GuoQiang Li] If we create a lot of big broadcast variables, Spark may hang
This commit is contained in:
parent
0c34fa5b4b
commit
7c0c26cd12
|
@ -95,16 +95,21 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
|
|||
future.onSuccess { case message =>
|
||||
val bufferMessage = message.asInstanceOf[BufferMessage]
|
||||
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
|
||||
|
||||
for (blockMessage <- blockMessageArray) {
|
||||
if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
|
||||
listener.onBlockFetchFailure(
|
||||
new SparkException(s"Unexpected message ${blockMessage.getType} received from $cmId"))
|
||||
} else {
|
||||
val blockId = blockMessage.getId
|
||||
val networkSize = blockMessage.getData.limit()
|
||||
listener.onBlockFetchSuccess(
|
||||
blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData))
|
||||
// SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty.
|
||||
if (blockMessageArray.isEmpty) {
|
||||
listener.onBlockFetchFailure(
|
||||
new SparkException(s"Received empty message from $cmId"))
|
||||
} else {
|
||||
for (blockMessage <- blockMessageArray) {
|
||||
val msgType = blockMessage.getType
|
||||
if (msgType != BlockMessage.TYPE_GOT_BLOCK) {
|
||||
listener.onBlockFetchFailure(
|
||||
new SparkException(s"Unexpected message ${msgType} received from $cmId"))
|
||||
} else {
|
||||
val blockId = blockMessage.getId
|
||||
listener.onBlockFetchSuccess(
|
||||
blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData))
|
||||
}
|
||||
}
|
||||
}
|
||||
}(cm.futureExecContext)
|
||||
|
|
Loading…
Reference in a new issue