Further bug fix to HttpBroadcast
This commit is contained in:
parent
8ed662862e
commit
4b05798c06
|
@ -34,7 +34,7 @@ extends Broadcast[T] with Logging with Serializable {
|
|||
} else {
|
||||
logInfo("Started reading broadcast variable " + uuid)
|
||||
val start = System.nanoTime
|
||||
value_ = HttpBroadcast.read(uuid).asInstanceOf[T]
|
||||
value_ = HttpBroadcast.read[T](uuid)
|
||||
HttpBroadcast.values.put(uuid, 0, value_)
|
||||
val time = (System.nanoTime - start) / 1e9
|
||||
logInfo("Reading broadcast variable " + uuid + " took " + time + " s")
|
||||
|
@ -95,7 +95,7 @@ private object HttpBroadcast extends Logging {
|
|||
serOut.close()
|
||||
}
|
||||
|
||||
def read(uuid: UUID): Any = {
|
||||
def read[T](uuid: UUID): T = {
|
||||
val url = serverUri + "/broadcast-" + uuid
|
||||
var in = if (compress) {
|
||||
new LZFInputStream(new URL(url).openStream()) // Does its own buffering
|
||||
|
@ -104,7 +104,7 @@ private object HttpBroadcast extends Logging {
|
|||
}
|
||||
val ser = SparkEnv.get.serializer.newInstance()
|
||||
val serIn = ser.inputStream(in)
|
||||
val obj = serIn.readObject()
|
||||
val obj = serIn.readObject[T]()
|
||||
serIn.close()
|
||||
obj
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue