[SPARK-6762]Fix potential resource leaks in CheckPoint CheckpointWriter and CheckpointReader

The close action should be placed within finally block to avoid the potential resource leaks

Author: lisurprise <zhichao.li@intel.com>

Closes #5407 from zhichao-li/master and squashes the following commits:

065999f [lisurprise] add guard for null
ef862d6 [lisurprise] remove fs.close
a754adc [lisurprise] refactor with tryWithSafeFinally
824adb3 [lisurprise] close before validation
c877da7 [lisurprise] Fix potential resource leaks
This commit is contained in:
lisurprise 2015-04-13 12:18:05 +01:00 committed by Sean Owen
parent 950645d597
commit cadd7d72c5
2 changed files with 31 additions and 19 deletions

View file

@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkException, SparkConf, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.MetadataCleaner
import org.apache.spark.util.{MetadataCleaner, Utils}
import org.apache.spark.streaming.scheduler.JobGenerator
@ -139,8 +139,11 @@ class CheckpointWriter(
// Write checkpoint to temp file
fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
fos.write(bytes)
fos.close()
Utils.tryWithSafeFinally {
fos.write(bytes)
} {
fos.close()
}
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
@ -187,9 +190,11 @@ class CheckpointWriter(
val bos = new ByteArrayOutputStream()
val zos = compressionCodec.compressedOutputStream(bos)
val oos = new ObjectOutputStream(zos)
oos.writeObject(checkpoint)
oos.close()
bos.close()
Utils.tryWithSafeFinally {
oos.writeObject(checkpoint)
} {
oos.close()
}
try {
executor.execute(new CheckpointWriteHandler(
checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
@ -248,18 +253,24 @@ object CheckpointReader extends Logging {
checkpointFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file " + file)
try {
val fis = fs.open(file)
// ObjectInputStream uses the last defined user-defined class loader in the stack
// to find classes, which maybe the wrong class loader. Hence, a inherited version
// of ObjectInputStream is used to explicitly use the current thread's default class
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
val zis = compressionCodec.compressedInputStream(fis)
val ois = new ObjectInputStreamWithLoader(zis,
Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
fs.close()
var ois: ObjectInputStreamWithLoader = null
var cp: Checkpoint = null
Utils.tryWithSafeFinally {
val fis = fs.open(file)
// ObjectInputStream uses the last defined user-defined class loader in the stack
// to find classes, which maybe the wrong class loader. Hence, a inherited version
// of ObjectInputStream is used to explicitly use the current thread's default class
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
val zis = compressionCodec.compressedInputStream(fis)
ois = new ObjectInputStreamWithLoader(zis,
Thread.currentThread().getContextClassLoader)
cp = ois.readObject.asInstanceOf[Checkpoint]
} {
if (ois != null) {
ois.close()
}
}
cp.validate()
logInfo("Checkpoint successfully loaded from file " + file)
logInfo("Checkpoint was generated at time " + cp.checkpointTime)

View file

@ -72,7 +72,8 @@ object RawTextSender extends Logging {
} catch {
case e: IOException =>
logError("Client disconnected")
socket.close()
} finally {
socket.close()
}
}
}