Error handling for disk I/O in DiskSpillingCache
Also renamed the property spark.DiskSpillingCache.cacheDir to spark.diskSpillingCache.cacheDir in order to follow conventions.
This commit is contained in:
parent
12ff0d2dc3
commit
a4c04f3f6f
|
@ -2,12 +2,12 @@ package spark
|
||||||
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import java.io.{FileOutputStream,FileInputStream}
|
import java.io.{FileOutputStream,FileInputStream}
|
||||||
|
import java.io.IOException
|
||||||
import java.util.LinkedHashMap
|
import java.util.LinkedHashMap
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
|
||||||
// TODO: error handling
|
|
||||||
// TODO: cache into a separate directory using Utils.createTempDir
|
// TODO: cache into a separate directory using Utils.createTempDir
|
||||||
// TODO: after reading an entry from disk, put it into the cache
|
// TODO: clean up disk cache afterwards
|
||||||
|
|
||||||
class DiskSpillingCache extends BoundedMemoryCache {
|
class DiskSpillingCache extends BoundedMemoryCache {
|
||||||
private val diskMap = new LinkedHashMap[Any, File](32, 0.75f, true)
|
private val diskMap = new LinkedHashMap[Any, File](32, 0.75f, true)
|
||||||
|
@ -21,14 +21,22 @@ class DiskSpillingCache extends BoundedMemoryCache {
|
||||||
|
|
||||||
case _ => diskMap.get(key) match {
|
case _ => diskMap.get(key) match {
|
||||||
case file: Any => // found on disk
|
case file: Any => // found on disk
|
||||||
val startTime = System.currentTimeMillis
|
try {
|
||||||
val bytes = new Array[Byte](file.length.toInt)
|
val startTime = System.currentTimeMillis
|
||||||
new FileInputStream(file).read(bytes)
|
val bytes = new Array[Byte](file.length.toInt)
|
||||||
val timeTaken = System.currentTimeMillis - startTime
|
new FileInputStream(file).read(bytes)
|
||||||
logInfo("Reading key %s of size %d bytes from disk took %d ms".format(
|
val timeTaken = System.currentTimeMillis - startTime
|
||||||
key, file.length, timeTaken))
|
logInfo("Reading key %s of size %d bytes from disk took %d ms".format(
|
||||||
super.put(key, bytes)
|
key, file.length, timeTaken))
|
||||||
ser.deserialize(bytes.asInstanceOf[Array[Byte]])
|
super.put(key, bytes)
|
||||||
|
ser.deserialize(bytes.asInstanceOf[Array[Byte]])
|
||||||
|
} catch {
|
||||||
|
case e: IOException =>
|
||||||
|
logWarning("Failed to read key %s from disk at %s: %s".format(
|
||||||
|
key, file.getPath(), e.getMessage()))
|
||||||
|
diskMap.remove(key) // remove dead entry
|
||||||
|
null
|
||||||
|
}
|
||||||
|
|
||||||
case _ => // not found
|
case _ => // not found
|
||||||
null
|
null
|
||||||
|
@ -50,12 +58,19 @@ class DiskSpillingCache extends BoundedMemoryCache {
|
||||||
logInfo("Spilling key %s of size %d to make space".format(
|
logInfo("Spilling key %s of size %d to make space".format(
|
||||||
key, entry.size))
|
key, entry.size))
|
||||||
val cacheDir = System.getProperty(
|
val cacheDir = System.getProperty(
|
||||||
"spark.DiskSpillingCache.cacheDir",
|
"spark.diskSpillingCache.cacheDir",
|
||||||
System.getProperty("java.io.tmpdir"))
|
System.getProperty("java.io.tmpdir"))
|
||||||
val file = new File(cacheDir, "spark-dsc-" + UUID.randomUUID.toString)
|
val file = new File(cacheDir, "spark-dsc-" + UUID.randomUUID.toString)
|
||||||
val stream = new FileOutputStream(file)
|
try {
|
||||||
stream.write(entry.value.asInstanceOf[Array[Byte]])
|
val stream = new FileOutputStream(file)
|
||||||
stream.close()
|
stream.write(entry.value.asInstanceOf[Array[Byte]])
|
||||||
diskMap.put(key, file)
|
stream.close()
|
||||||
|
diskMap.put(key, file)
|
||||||
|
} catch {
|
||||||
|
case e: IOException =>
|
||||||
|
logWarning("Failed to spill key %s to disk at %s: %s".format(
|
||||||
|
key, file.getPath(), e.getMessage()))
|
||||||
|
// Do nothing and let the entry be discarded
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue