Tightening visibility for various Broadcast related classes.
In preparation for SPARK-2521. Author: Reynold Xin <rxin@apache.org> Closes #1438 from rxin/broadcast and squashes the following commits: 432f1cc [Reynold Xin] Tightening visibility for various Broadcast related classes.
This commit is contained in:
parent
33e64ecacb
commit
efe2a8b126
|
@ -106,23 +106,23 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
|
||||||
* Actually get the broadcasted value. Concrete implementations of Broadcast class must
|
* Actually get the broadcasted value. Concrete implementations of Broadcast class must
|
||||||
* define their own way to get the value.
|
* define their own way to get the value.
|
||||||
*/
|
*/
|
||||||
private[spark] def getValue(): T
|
protected def getValue(): T
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Actually unpersist the broadcasted value on the executors. Concrete implementations of
|
* Actually unpersist the broadcasted value on the executors. Concrete implementations of
|
||||||
* Broadcast class must define their own logic to unpersist their own data.
|
* Broadcast class must define their own logic to unpersist their own data.
|
||||||
*/
|
*/
|
||||||
private[spark] def doUnpersist(blocking: Boolean)
|
protected def doUnpersist(blocking: Boolean)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Actually destroy all data and metadata related to this broadcast variable.
|
* Actually destroy all data and metadata related to this broadcast variable.
|
||||||
* Implementation of Broadcast class must define their own logic to destroy their own
|
* Implementation of Broadcast class must define their own logic to destroy their own
|
||||||
* state.
|
* state.
|
||||||
*/
|
*/
|
||||||
private[spark] def doDestroy(blocking: Boolean)
|
protected def doDestroy(blocking: Boolean)
|
||||||
|
|
||||||
/** Check if this broadcast is valid. If not valid, exception is thrown. */
|
/** Check if this broadcast is valid. If not valid, exception is thrown. */
|
||||||
private[spark] def assertValid() {
|
protected def assertValid() {
|
||||||
if (!_isValid) {
|
if (!_isValid) {
|
||||||
throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
|
throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,9 +40,9 @@ private[spark] class HttpBroadcast[T: ClassTag](
|
||||||
@transient var value_ : T, isLocal: Boolean, id: Long)
|
@transient var value_ : T, isLocal: Boolean, id: Long)
|
||||||
extends Broadcast[T](id) with Logging with Serializable {
|
extends Broadcast[T](id) with Logging with Serializable {
|
||||||
|
|
||||||
def getValue = value_
|
override protected def getValue() = value_
|
||||||
|
|
||||||
val blockId = BroadcastBlockId(id)
|
private val blockId = BroadcastBlockId(id)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster
|
* Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster
|
||||||
|
@ -60,14 +60,14 @@ private[spark] class HttpBroadcast[T: ClassTag](
|
||||||
/**
|
/**
|
||||||
* Remove all persisted state associated with this HTTP broadcast on the executors.
|
* Remove all persisted state associated with this HTTP broadcast on the executors.
|
||||||
*/
|
*/
|
||||||
def doUnpersist(blocking: Boolean) {
|
override protected def doUnpersist(blocking: Boolean) {
|
||||||
HttpBroadcast.unpersist(id, removeFromDriver = false, blocking)
|
HttpBroadcast.unpersist(id, removeFromDriver = false, blocking)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove all persisted state associated with this HTTP broadcast on the executors and driver.
|
* Remove all persisted state associated with this HTTP broadcast on the executors and driver.
|
||||||
*/
|
*/
|
||||||
def doDestroy(blocking: Boolean) {
|
override protected def doDestroy(blocking: Boolean) {
|
||||||
HttpBroadcast.unpersist(id, removeFromDriver = true, blocking)
|
HttpBroadcast.unpersist(id, removeFromDriver = true, blocking)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ private[spark] class HttpBroadcast[T: ClassTag](
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[spark] object HttpBroadcast extends Logging {
|
private[broadcast] object HttpBroadcast extends Logging {
|
||||||
private var initialized = false
|
private var initialized = false
|
||||||
private var broadcastDir: File = null
|
private var broadcastDir: File = null
|
||||||
private var compress: Boolean = false
|
private var compress: Boolean = false
|
||||||
|
@ -160,7 +160,7 @@ private[spark] object HttpBroadcast extends Logging {
|
||||||
|
|
||||||
def getFile(id: Long) = new File(broadcastDir, BroadcastBlockId(id).name)
|
def getFile(id: Long) = new File(broadcastDir, BroadcastBlockId(id).name)
|
||||||
|
|
||||||
def write(id: Long, value: Any) {
|
private def write(id: Long, value: Any) {
|
||||||
val file = getFile(id)
|
val file = getFile(id)
|
||||||
val out: OutputStream = {
|
val out: OutputStream = {
|
||||||
if (compress) {
|
if (compress) {
|
||||||
|
@ -176,7 +176,7 @@ private[spark] object HttpBroadcast extends Logging {
|
||||||
files += file
|
files += file
|
||||||
}
|
}
|
||||||
|
|
||||||
def read[T: ClassTag](id: Long): T = {
|
private def read[T: ClassTag](id: Long): T = {
|
||||||
logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
|
logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
|
||||||
val url = serverUri + "/" + BroadcastBlockId(id).name
|
val url = serverUri + "/" + BroadcastBlockId(id).name
|
||||||
|
|
||||||
|
|
|
@ -27,21 +27,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
|
||||||
* [[org.apache.spark.broadcast.HttpBroadcast]] for more details about this mechanism.
|
* [[org.apache.spark.broadcast.HttpBroadcast]] for more details about this mechanism.
|
||||||
*/
|
*/
|
||||||
class HttpBroadcastFactory extends BroadcastFactory {
|
class HttpBroadcastFactory extends BroadcastFactory {
|
||||||
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
|
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
|
||||||
HttpBroadcast.initialize(isDriver, conf, securityMgr)
|
HttpBroadcast.initialize(isDriver, conf, securityMgr)
|
||||||
}
|
}
|
||||||
|
|
||||||
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
|
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
|
||||||
new HttpBroadcast[T](value_, isLocal, id)
|
new HttpBroadcast[T](value_, isLocal, id)
|
||||||
|
|
||||||
def stop() { HttpBroadcast.stop() }
|
override def stop() { HttpBroadcast.stop() }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove all persisted state associated with the HTTP broadcast with the given ID.
|
* Remove all persisted state associated with the HTTP broadcast with the given ID.
|
||||||
* @param removeFromDriver Whether to remove state from the driver
|
* @param removeFromDriver Whether to remove state from the driver
|
||||||
* @param blocking Whether to block until unbroadcasted
|
* @param blocking Whether to block until unbroadcasted
|
||||||
*/
|
*/
|
||||||
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
|
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
|
||||||
HttpBroadcast.unpersist(id, removeFromDriver, blocking)
|
HttpBroadcast.unpersist(id, removeFromDriver, blocking)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.spark.broadcast
|
||||||
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
|
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
|
||||||
|
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
import scala.math
|
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
|
|
||||||
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
|
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
|
||||||
|
@ -49,19 +48,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](
|
||||||
@transient var value_ : T, isLocal: Boolean, id: Long)
|
@transient var value_ : T, isLocal: Boolean, id: Long)
|
||||||
extends Broadcast[T](id) with Logging with Serializable {
|
extends Broadcast[T](id) with Logging with Serializable {
|
||||||
|
|
||||||
def getValue = value_
|
override protected def getValue() = value_
|
||||||
|
|
||||||
val broadcastId = BroadcastBlockId(id)
|
private val broadcastId = BroadcastBlockId(id)
|
||||||
|
|
||||||
TorrentBroadcast.synchronized {
|
TorrentBroadcast.synchronized {
|
||||||
SparkEnv.get.blockManager.putSingle(
|
SparkEnv.get.blockManager.putSingle(
|
||||||
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
|
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
|
||||||
}
|
}
|
||||||
|
|
||||||
@transient var arrayOfBlocks: Array[TorrentBlock] = null
|
@transient private var arrayOfBlocks: Array[TorrentBlock] = null
|
||||||
@transient var totalBlocks = -1
|
@transient private var totalBlocks = -1
|
||||||
@transient var totalBytes = -1
|
@transient private var totalBytes = -1
|
||||||
@transient var hasBlocks = 0
|
@transient private var hasBlocks = 0
|
||||||
|
|
||||||
if (!isLocal) {
|
if (!isLocal) {
|
||||||
sendBroadcast()
|
sendBroadcast()
|
||||||
|
@ -70,7 +69,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
|
||||||
/**
|
/**
|
||||||
* Remove all persisted state associated with this Torrent broadcast on the executors.
|
* Remove all persisted state associated with this Torrent broadcast on the executors.
|
||||||
*/
|
*/
|
||||||
def doUnpersist(blocking: Boolean) {
|
override protected def doUnpersist(blocking: Boolean) {
|
||||||
TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
|
TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,11 +77,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](
|
||||||
* Remove all persisted state associated with this Torrent broadcast on the executors
|
* Remove all persisted state associated with this Torrent broadcast on the executors
|
||||||
* and driver.
|
* and driver.
|
||||||
*/
|
*/
|
||||||
def doDestroy(blocking: Boolean) {
|
override protected def doDestroy(blocking: Boolean) {
|
||||||
TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
|
TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
|
||||||
}
|
}
|
||||||
|
|
||||||
def sendBroadcast() {
|
private def sendBroadcast() {
|
||||||
val tInfo = TorrentBroadcast.blockifyObject(value_)
|
val tInfo = TorrentBroadcast.blockifyObject(value_)
|
||||||
totalBlocks = tInfo.totalBlocks
|
totalBlocks = tInfo.totalBlocks
|
||||||
totalBytes = tInfo.totalBytes
|
totalBytes = tInfo.totalBytes
|
||||||
|
@ -159,7 +158,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
|
||||||
hasBlocks = 0
|
hasBlocks = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
def receiveBroadcast(): Boolean = {
|
private def receiveBroadcast(): Boolean = {
|
||||||
// Receive meta-info about the size of broadcast data,
|
// Receive meta-info about the size of broadcast data,
|
||||||
// the number of chunks it is divided into, etc.
|
// the number of chunks it is divided into, etc.
|
||||||
val metaId = BroadcastBlockId(id, "meta")
|
val metaId = BroadcastBlockId(id, "meta")
|
||||||
|
@ -211,7 +210,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private[spark] object TorrentBroadcast extends Logging {
|
private[broadcast] object TorrentBroadcast extends Logging {
|
||||||
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
|
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
|
||||||
private var initialized = false
|
private var initialized = false
|
||||||
private var conf: SparkConf = null
|
private var conf: SparkConf = null
|
||||||
|
@ -272,17 +271,19 @@ private[spark] object TorrentBroadcast extends Logging {
|
||||||
* Remove all persisted blocks associated with this torrent broadcast on the executors.
|
* Remove all persisted blocks associated with this torrent broadcast on the executors.
|
||||||
* If removeFromDriver is true, also remove these persisted blocks on the driver.
|
* If removeFromDriver is true, also remove these persisted blocks on the driver.
|
||||||
*/
|
*/
|
||||||
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
|
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
|
||||||
|
synchronized {
|
||||||
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
|
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private[spark] case class TorrentBlock(
|
private[broadcast] case class TorrentBlock(
|
||||||
blockID: Int,
|
blockID: Int,
|
||||||
byteArray: Array[Byte])
|
byteArray: Array[Byte])
|
||||||
extends Serializable
|
extends Serializable
|
||||||
|
|
||||||
private[spark] case class TorrentInfo(
|
private[broadcast] case class TorrentInfo(
|
||||||
@transient arrayOfBlocks: Array[TorrentBlock],
|
@transient arrayOfBlocks: Array[TorrentBlock],
|
||||||
totalBlocks: Int,
|
totalBlocks: Int,
|
||||||
totalBytes: Int)
|
totalBytes: Int)
|
||||||
|
|
|
@ -28,21 +28,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
|
||||||
*/
|
*/
|
||||||
class TorrentBroadcastFactory extends BroadcastFactory {
|
class TorrentBroadcastFactory extends BroadcastFactory {
|
||||||
|
|
||||||
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
|
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
|
||||||
TorrentBroadcast.initialize(isDriver, conf)
|
TorrentBroadcast.initialize(isDriver, conf)
|
||||||
}
|
}
|
||||||
|
|
||||||
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
|
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
|
||||||
new TorrentBroadcast[T](value_, isLocal, id)
|
new TorrentBroadcast[T](value_, isLocal, id)
|
||||||
|
|
||||||
def stop() { TorrentBroadcast.stop() }
|
override def stop() { TorrentBroadcast.stop() }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove all persisted state associated with the torrent broadcast with the given ID.
|
* Remove all persisted state associated with the torrent broadcast with the given ID.
|
||||||
* @param removeFromDriver Whether to remove state from the driver.
|
* @param removeFromDriver Whether to remove state from the driver.
|
||||||
* @param blocking Whether to block until unbroadcasted
|
* @param blocking Whether to block until unbroadcasted
|
||||||
*/
|
*/
|
||||||
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
|
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
|
||||||
TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
|
TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue