Refacoring...

This commit is contained in:
Mosharaf Chowdhury 2011-04-26 17:41:31 -07:00
parent b8ab7862b8
commit 65848da8df
3 changed files with 53 additions and 33 deletions

View file

@ -19,11 +19,6 @@ trait Broadcast[T] {
override def toString = "spark.Broadcast(" + uuid + ")" override def toString = "spark.Broadcast(" + uuid + ")"
} }
trait BroadcastFactory {
def initialize (isMaster: Boolean): Unit
def newBroadcast[T] (value_ : T, isLocal: Boolean): Broadcast[T]
}
object Broadcast object Broadcast
extends Logging { extends Logging {
// Messages // Messages
@ -219,34 +214,6 @@ extends Logging {
} }
} }
// CHANGED: Keep track of the blockSize for THIS broadcast variable.
// Broadcast.BlockSize is expected to be updated across different broadcasts
@serializable
case class SourceInfo (val hostAddress: String,
val listenPort: Int,
val totalBlocks: Int = SourceInfo.UnusedParam,
val totalBytes: Int = SourceInfo.UnusedParam,
val blockSize: Int = Broadcast.BlockSize)
extends Comparable[SourceInfo] with Logging {
var currentLeechers = 0
var receptionFailed = false
var hasBlocks = 0
var hasBlocksBitVector: BitSet = new BitSet (totalBlocks)
// Ascending sort based on leecher count
def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers)
}
object SourceInfo {
// Constants for special values of listenPort
val TxNotStartedRetry = -1
val TxOverGoToHDFS = 0
// Other constants
val StopBroadcast = -2
val UnusedParam = 0
}
@serializable @serializable
case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) { } case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) { }

View file

@ -0,0 +1,12 @@
package spark.broadcast
/**
* An interface for all the broadcast implementations in Spark (to allow
* multiple broadcast implementations). SparkContext uses a user-specified
* BroadcastFactory implementation to instantiate a particular broadcast for the
* entire Spark job.
*/
trait BroadcastFactory {
def initialize (isMaster: Boolean): Unit
def newBroadcast[T] (value_ : T, isLocal: Boolean): Broadcast[T]
}

View file

@ -0,0 +1,41 @@
package spark.broadcast
import java.util.BitSet
import spark._
/**
* Used to keep and pass around information of peers involved in a broadcast
*
* CHANGED: Keep track of the blockSize for THIS broadcast variable.
* Broadcast.BlockSize is expected to be updated across different broadcasts
*/
@serializable
case class SourceInfo (val hostAddress: String,
val listenPort: Int,
val totalBlocks: Int = SourceInfo.UnusedParam,
val totalBytes: Int = SourceInfo.UnusedParam,
val blockSize: Int = Broadcast.BlockSize)
extends Comparable[SourceInfo] with Logging {
var currentLeechers = 0
var receptionFailed = false
var hasBlocks = 0
var hasBlocksBitVector: BitSet = new BitSet (totalBlocks)
// Ascending sort based on leecher count
def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers)
}
/**
* Helper Object of SourceInfo for its constants
*/
object SourceInfo {
// Constants for special values of listenPort
val TxNotStartedRetry = -1
val TxOverGoToHDFS = 0
// Other constants
val StopBroadcast = -2
val UnusedParam = 0
}