[SPARK-4031] Make torrent broadcast read blocks on use.

This avoids reading torrent broadcast variables when they are referenced in the closure but not used in the closure. This is done by using a `lazy val` to read broadcast blocks

cc rxin JoshRosen for review

Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>

Closes #2871 from shivaram/broadcast-read-value and squashes the following commits:

1456d65 [Shivaram Venkataraman] Use getUsedTimeMs and remove readObject
d6c5ee9 [Shivaram Venkataraman] Use laxy val to implement readBroadcastBlock
0b34df7 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into broadcast-read-value
9cec507 [Shivaram Venkataraman] Test if broadcast variables are read lazily
768b40b [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into broadcast-read-value
8792ed8 [Shivaram Venkataraman] Make torrent broadcast read blocks on use. This avoids reading broadcast variables when they are referenced in the closure but not used by the code.
This commit is contained in:
Shivaram Venkataraman 2014-10-28 10:14:16 -07:00
parent 0ac52e3055
commit 7768a800d4
3 changed files with 67 additions and 21 deletions

View file

@ -56,11 +56,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
extends Broadcast[T](id) with Logging with Serializable { extends Broadcast[T](id) with Logging with Serializable {
/** /**
* Value of the broadcast object. On driver, this is set directly by the constructor. * Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]],
* On executors, this is reconstructed by [[readObject]], which builds this value by reading * which builds this value by reading blocks from the driver and/or other executors.
* blocks from the driver and/or other executors. *
* On the driver, if the value is required, it is read lazily from the block manager.
*/ */
@transient private var _value: T = obj @transient private lazy val _value: T = readBroadcastBlock()
/** The compression codec to use, or None if compression is disabled */ /** The compression codec to use, or None if compression is disabled */
@transient private var compressionCodec: Option[CompressionCodec] = _ @transient private var compressionCodec: Option[CompressionCodec] = _
/** Size of each block. Default value is 4MB. This value is only read by the broadcaster. */ /** Size of each block. Default value is 4MB. This value is only read by the broadcaster. */
@ -79,22 +81,24 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
private val broadcastId = BroadcastBlockId(id) private val broadcastId = BroadcastBlockId(id)
/** Total number of blocks this broadcast variable contains. */ /** Total number of blocks this broadcast variable contains. */
private val numBlocks: Int = writeBlocks() private val numBlocks: Int = writeBlocks(obj)
override protected def getValue() = _value override protected def getValue() = {
_value
}
/** /**
* Divide the object into multiple blocks and put those blocks in the block manager. * Divide the object into multiple blocks and put those blocks in the block manager.
* * @param value the object to divide
* @return number of blocks this broadcast variable is divided into * @return number of blocks this broadcast variable is divided into
*/ */
private def writeBlocks(): Int = { private def writeBlocks(value: T): Int = {
// Store a copy of the broadcast variable in the driver so that tasks run on the driver // Store a copy of the broadcast variable in the driver so that tasks run on the driver
// do not create a duplicate copy of the broadcast variable's value. // do not create a duplicate copy of the broadcast variable's value.
SparkEnv.get.blockManager.putSingle(broadcastId, _value, StorageLevel.MEMORY_AND_DISK, SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
tellMaster = false) tellMaster = false)
val blocks = val blocks =
TorrentBroadcast.blockifyObject(_value, blockSize, SparkEnv.get.serializer, compressionCodec) TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach { case (block, i) => blocks.zipWithIndex.foreach { case (block, i) =>
SparkEnv.get.blockManager.putBytes( SparkEnv.get.blockManager.putBytes(
BroadcastBlockId(id, "piece" + i), BroadcastBlockId(id, "piece" + i),
@ -157,31 +161,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
out.defaultWriteObject() out.defaultWriteObject()
} }
/** Used by the JVM when deserializing this object. */ private def readBroadcastBlock(): T = Utils.tryOrIOException {
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
TorrentBroadcast.synchronized { TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf) setConf(SparkEnv.get.conf)
SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match { SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
case Some(x) => case Some(x) =>
_value = x.asInstanceOf[T] x.asInstanceOf[T]
case None => case None =>
logInfo("Started reading broadcast variable " + id) logInfo("Started reading broadcast variable " + id)
val start = System.nanoTime() val startTimeMs = System.currentTimeMillis()
val blocks = readBlocks() val blocks = readBlocks()
val time = (System.nanoTime() - start) / 1e9 logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
logInfo("Reading broadcast variable " + id + " took " + time + " s")
_value = val obj = TorrentBroadcast.unBlockifyObject[T](
TorrentBroadcast.unBlockifyObject[T](blocks, SparkEnv.get.serializer, compressionCodec) blocks, SparkEnv.get.serializer, compressionCodec)
// Store the merged copy in BlockManager so other tasks on this executor don't // Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it. // need to re-fetch it.
SparkEnv.get.blockManager.putSingle( SparkEnv.get.blockManager.putSingle(
broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false) broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
obj
} }
} }
} }
} }

View file

@ -988,6 +988,21 @@ private[spark] object Utils extends Logging {
} }
} }
/**
* Execute a block of code that returns a value, re-throwing any non-fatal uncaught
* exceptions as IOException. This is used when implementing Externalizable and Serializable's
* read and write methods, since Java's serializer will not report non-IOExceptions properly;
* see SPARK-4080 for more context.
*/
def tryOrIOException[T](block: => T): T = {
try {
block
} catch {
case e: IOException => throw e
case NonFatal(t) => throw new IOException(t)
}
}
/** Default filtering function for finding call sites using `getCallSite`. */ /** Default filtering function for finding call sites using `getCallSite`. */
private def coreExclusionFunction(className: String): Boolean = { private def coreExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the "core" Spark API that we want to skip when // A regular expression to match classes of the "core" Spark API that we want to skip when

View file

@ -21,11 +21,28 @@ import scala.util.Random
import org.scalatest.{Assertions, FunSuite} import org.scalatest.{Assertions, FunSuite}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException} import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkEnv}
import org.apache.spark.io.SnappyCompressionCodec import org.apache.spark.io.SnappyCompressionCodec
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.JavaSerializer import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage._ import org.apache.spark.storage._
// Dummy class that creates a broadcast variable but doesn't use it
class DummyBroadcastClass(rdd: RDD[Int]) extends Serializable {
@transient val list = List(1, 2, 3, 4)
val broadcast = rdd.context.broadcast(list)
val bid = broadcast.id
def doSomething() = {
rdd.map { x =>
val bm = SparkEnv.get.blockManager
// Check if broadcast block was fetched
val isFound = bm.getLocal(BroadcastBlockId(bid)).isDefined
(x, isFound)
}.collect().toSet
}
}
class BroadcastSuite extends FunSuite with LocalSparkContext { class BroadcastSuite extends FunSuite with LocalSparkContext {
private val httpConf = broadcastConf("HttpBroadcastFactory") private val httpConf = broadcastConf("HttpBroadcastFactory")
@ -105,6 +122,17 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
} }
} }
test("Test Lazy Broadcast variables with TorrentBroadcast") {
val numSlaves = 2
val conf = torrentConf.clone
sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf)
val rdd = sc.parallelize(1 to numSlaves)
val results = new DummyBroadcastClass(rdd).doSomething()
assert(results.toSet === (1 to numSlaves).map(x => (x, false)).toSet)
}
test("Unpersisting HttpBroadcast on executors only in local mode") { test("Unpersisting HttpBroadcast on executors only in local mode") {
testUnpersistHttpBroadcast(distributed = false, removeFromDriver = false) testUnpersistHttpBroadcast(distributed = false, removeFromDriver = false)
} }