Removed unnecessary code, and added comment of memory-latency tradeoff.

This commit is contained in:
Mosharaf Chowdhury 2013-10-14 09:40:51 -07:00
parent 4602e2bf6e
commit 6e5a60fab4

View file

@ -89,7 +89,12 @@ extends Broadcast[T](id) with Logging with Serializable {
if (receiveBroadcast(id)) {
value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
SparkEnv.get.blockManager.putSingle(broadcastId, value_, StorageLevel.MEMORY_AND_DISK, false)
// Store the merged copy in cache so that the next worker doesn't need to rebuild it.
// This creates a tradeoff between memory usage and latency.
// Storing copy doubles the memory footprint; not storing doubles deserialization cost.
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, false)
// Remove arrayOfBlocks from memory once value_ is on local cache
resetWorkerVariables()
@ -111,9 +116,6 @@ extends Broadcast[T](id) with Logging with Serializable {
}
def receiveBroadcast(variableID: Long): Boolean = {
if (totalBlocks > 0 && totalBlocks == hasBlocks)
return true
// Receive meta-info
val metaId = broadcastId + "_meta"
var attemptId = 10