SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers
`var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection. There are two place will call `replicate(blockId, bytesAfterPut, level)` *17f3075bc4/core/src/main/scala/org/apache/spark/storage/BlockManager.scala (L644)
runs in `connectionManager.futureExecContext` *17f3075bc4/core/src/main/scala/org/apache/spark/storage/BlockManager.scala (L752)
`doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`. As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`. The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`. Author: zsxwing <zsxwing@gmail.com> Closes #887 from zsxwing/SPARK-1932 and squashes the following commits: 524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers
This commit is contained in:
parent
90e281b55a
commit
549830b0db
|
@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
|
|||
implicit val futureExecContext = ExecutionContext.fromExecutor(
|
||||
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
|
||||
|
||||
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
|
||||
@volatile
|
||||
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null
|
||||
|
||||
private val authEnabled = securityManager.isAuthenticationEnabled()
|
||||
|
||||
|
|
|
@ -772,7 +772,7 @@ private[spark] class BlockManager(
|
|||
/**
|
||||
* Replicate block to another node.
|
||||
*/
|
||||
var cachedPeers: Seq[BlockManagerId] = null
|
||||
@volatile var cachedPeers: Seq[BlockManagerId] = null
|
||||
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
|
||||
val tLevel = StorageLevel(
|
||||
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
|
||||
|
|
Loading…
Reference in a new issue