Fix replication with MEMORY_ONLY_DESER_2

This commit is contained in:
Matei Zaharia 2012-08-26 18:16:25 -07:00
parent 57796b183e
commit 29e83f39e9

View file

@ -364,6 +364,12 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val startTimeMs = System.currentTimeMillis
var bytes: ByteBuffer = null
// If we need to replicate the data, we'll want access to the values, but because our
// put will read the whole iterator, there will be no values left. For the case where
// the put serializes data, we'll remember the bytes, above; but for the case where
// it doesn't, such as MEMORY_ONLY_DESER, let's rely on the put returning an Iterator.
var valuesAfterPut: Iterator[Any] = null
locker.getLock(blockId).synchronized {
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
@ -391,7 +397,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// If only save to memory
memoryStore.putValues(blockId, values, level) match {
case Right(newBytes) => bytes = newBytes
case _ =>
case Left(newIterator) => valuesAfterPut = newIterator
}
} else {
// If only save to disk
@ -408,8 +414,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Replicate block if required
if (level.replication > 1) {
// Serialize the block if not already done
if (bytes == null) {
bytes = dataSerialize(values) // serialize the block if not already done
if (valuesAfterPut == null) {
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytes = dataSerialize(valuesAfterPut)
}
replicate(blockId, bytes, level)
}