Added metadata cleaner to HttpBroadcast to clean up old broacast files.

This commit is contained in:
Tathagata Das 2012-12-03 22:37:31 -08:00
parent 609e00d599
commit a69a82be26

View file

@ -11,6 +11,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import spark._
import spark.storage.StorageLevel
import util.{MetadataCleaner, TimeStampedHashSet}
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
@ -64,6 +65,10 @@ private object HttpBroadcast extends Logging {
private var serverUri: String = null
private var server: HttpServer = null
private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
def initialize(isMaster: Boolean) {
synchronized {
if (!initialized) {
@ -85,6 +90,7 @@ private object HttpBroadcast extends Logging {
server = null
}
initialized = false
cleaner.cancel()
}
}
@ -108,6 +114,7 @@ private object HttpBroadcast extends Logging {
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file.getAbsolutePath
}
def read[T](id: Long): T = {
@ -123,4 +130,21 @@ private object HttpBroadcast extends Logging {
serIn.close()
obj
}
def cleanup(cleanupTime: Long) {
val iterator = files.internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
val (file, time) = (entry.getKey, entry.getValue)
if (time < cleanupTime) {
try {
iterator.remove()
new File(file.toString).delete()
logInfo("Deleted broadcast file '" + file + "'")
} catch {
case e: Exception => logWarning("Could not delete broadcast file '" + file + "'", e)
}
}
}
}
}