diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index ff9914ae25..45f99717bc 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -364,6 +364,12 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val startTimeMs = System.currentTimeMillis var bytes: ByteBuffer = null + + // If we need to replicate the data, we'll want access to the values, but because our + // put will read the whole iterator, there will be no values left. For the case where + // the put serializes data, we'll remember the bytes, above; but for the case where + // it doesn't, such as MEMORY_ONLY_DESER, let's rely on the put returning an Iterator. + var valuesAfterPut: Iterator[Any] = null locker.getLock(blockId).synchronized { logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) @@ -391,7 +397,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // If only save to memory memoryStore.putValues(blockId, values, level) match { case Right(newBytes) => bytes = newBytes - case _ => + case Left(newIterator) => valuesAfterPut = newIterator } } else { // If only save to disk @@ -408,8 +414,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // Replicate block if required if (level.replication > 1) { + // Serialize the block if not already done if (bytes == null) { - bytes = dataSerialize(values) // serialize the block if not already done + if (valuesAfterPut == null) { + throw new SparkException( + "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") + } + bytes = dataSerialize(valuesAfterPut) } replicate(blockId, bytes, level) }