[SPARK-6560][CORE] Do not suppress exceptions from writer.write.

If there is a failure in the Hadoop backend while calling
writer.write, we should remember this original exception,
and try to call writer.close(), but if that fails as well,
still report the original exception.

Note that, if writer.write fails, it is likely that writer
was left in an invalid state, and so actually makes it more
likely that writer.close will also fail. Which just increases
the chances for writer.write's exception to be suppressed.

This patch introduces an admittedly potentially too cute
Utils.tryWithSafeFinally method to handle the try/finally
gyrations.

Author: Stephen Haberman <stephen@exigencecorp.com>

Closes #5223 from stephenh/do_not_suppress_writer_exception and squashes the following commits:

c7ad53f [Stephen Haberman] [SPARK-6560][CORE] Do not suppress exceptions from writer.write.
This commit is contained in:
Stephen Haberman 2015-04-03 09:48:37 +01:00 committed by Sean Owen
parent 82701ee25f
commit b0d884f044
12 changed files with 118 additions and 62 deletions

View file

@ -356,11 +356,14 @@ private[spark] object MapOutputTracker extends Logging {
def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
val out = new ByteArrayOutputStream
val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
objOut.writeObject(statuses)
Utils.tryWithSafeFinally {
// Since statuses can be modified in parallel, sync on it
statuses.synchronized {
objOut.writeObject(statuses)
}
} {
objOut.close()
}
objOut.close()
out.toByteArray
}

View file

@ -614,9 +614,9 @@ private[spark] object PythonRDD extends Logging {
try {
val sock = serverSocket.accept()
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
try {
Utils.tryWithSafeFinally {
writeIteratorToStream(items, out)
} finally {
} {
out.close()
}
} catch {
@ -862,9 +862,9 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
val file = File.createTempFile("broadcast", "", dir)
path = file.getAbsolutePath
val out = new FileOutputStream(file)
try {
Utils.tryWithSafeFinally {
Utils.copyStream(in, out)
} finally {
} {
out.close()
}
}

View file

@ -165,7 +165,7 @@ private[broadcast] object HttpBroadcast extends Logging {
private def write(id: Long, value: Any) {
val file = getFile(id)
val fileOutputStream = new FileOutputStream(file)
try {
Utils.tryWithSafeFinally {
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(fileOutputStream)
@ -175,10 +175,13 @@ private[broadcast] object HttpBroadcast extends Logging {
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
Utils.tryWithSafeFinally {
serOut.writeObject(value)
} {
serOut.close()
}
files += file
} finally {
} {
fileOutputStream.close()
}
}
@ -212,9 +215,11 @@ private[broadcast] object HttpBroadcast extends Logging {
}
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
val obj = serIn.readObject[T]()
serIn.close()
obj
Utils.tryWithSafeFinally {
serIn.readObject[T]()
} {
serIn.close()
}
}
/**

View file

@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import akka.serialization.Serialization
import org.apache.spark.Logging
import org.apache.spark.util.Utils
/**
@ -59,9 +60,9 @@ private[master] class FileSystemPersistenceEngine(
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
val out = new FileOutputStream(file)
try {
Utils.tryWithSafeFinally {
out.write(serialized)
} finally {
} {
out.close()
}
}

View file

@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException
import com.google.common.base.Charsets
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.Utils
/**
* A client that submits applications to the standalone Master using a REST protocol.
@ -148,8 +149,11 @@ private[deploy] class StandaloneRestClient extends Logging {
conn.setRequestProperty("charset", "utf-8")
conn.setDoOutput(true)
val out = new DataOutputStream(conn.getOutputStream)
out.write(json.getBytes(Charsets.UTF_8))
out.close()
Utils.tryWithSafeFinally {
out.write(json.getBytes(Charsets.UTF_8))
} {
out.close()
}
readResponse(conn)
}

View file

@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@ -112,8 +113,11 @@ private[spark] object CheckpointRDD extends Logging {
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
serializeStream.writeAll(iterator)
serializeStream.close()
Utils.tryWithSafeFinally {
serializeStream.writeAll(iterator)
} {
serializeStream.close()
}
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {

View file

@ -995,7 +995,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
require(writer != null, "Unable to obtain RecordWriter")
var recordsWritten = 0L
try {
Utils.tryWithSafeFinally {
while (iter.hasNext) {
val pair = iter.next()
writer.write(pair._1, pair._2)
@ -1004,7 +1004,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} finally {
} {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
@ -1068,7 +1068,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
var recordsWritten = 0L
try {
Utils.tryWithSafeFinally {
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
@ -1077,7 +1078,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} finally {
} {
writer.close()
}
writer.commit()

View file

@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._
import org.apache.spark.util.Utils
import IndexShuffleBlockManager.NOOP_REDUCE_ID
@ -78,16 +79,15 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
try {
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} finally {
} {
out.close()
}
}

View file

@ -23,6 +23,7 @@ import java.nio.channels.FileChannel
import org.apache.spark.Logging
import org.apache.spark.serializer.{SerializationStream, Serializer}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.util.Utils
/**
* An interface for writing JVM objects to some underlying storage. This interface allows
@ -140,14 +141,17 @@ private[spark] class DiskBlockObjectWriter(
override def close() {
if (initialized) {
if (syncWrites) {
// Force outstanding writes to disk and track how long it takes
objOut.flush()
callWithTiming {
fos.getFD.sync()
Utils.tryWithSafeFinally {
if (syncWrites) {
// Force outstanding writes to disk and track how long it takes
objOut.flush()
callWithTiming {
fos.getFD.sync()
}
}
} {
objOut.close()
}
objOut.close()
channel = null
bs = null

View file

@ -46,10 +46,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val channel = new FileOutputStream(file).getChannel
while (bytes.remaining > 0) {
channel.write(bytes)
Utils.tryWithSafeFinally {
while (bytes.remaining > 0) {
channel.write(bytes)
}
} {
channel.close()
}
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
@ -75,9 +78,9 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
try {
try {
Utils.tryWithSafeFinally {
blockManager.dataSerializeStream(blockId, outputStream, values)
} finally {
} {
// Close outputStream here because it should be closed before file is deleted.
outputStream.close()
}
@ -106,8 +109,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
val channel = new RandomAccessFile(file, "r").getChannel
try {
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
if (length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(length.toInt)
@ -123,7 +125,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
} else {
Some(channel.map(MapMode.READ_ONLY, offset, length))
}
} finally {
} {
channel.close()
}
}

View file

@ -313,7 +313,7 @@ private[spark] object Utils extends Logging {
transferToEnabled: Boolean = false): Long =
{
var count = 0L
try {
tryWithSafeFinally {
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
&& transferToEnabled) {
// When both streams are File stream, use transferTo to improve copy performance.
@ -353,7 +353,7 @@ private[spark] object Utils extends Logging {
}
}
count
} finally {
} {
if (closeStreams) {
try {
in.close()
@ -1214,6 +1214,44 @@ private[spark] object Utils extends Logging {
}
}
/**
* Execute a block of code, then a finally block, but if exceptions happen in
* the finally block, do not suppress the original exception.
*
* This is primarily an issue with `finally { out.close() }` blocks, where
* close needs to be called to clean up `out`, but if an exception happened
* in `out.write`, it's likely `out` may be corrupted and `out.close` will
* fail as well. This would then suppress the original/likely more meaningful
* exception from the original `out.write` call.
*/
def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
// It would be nice to find a method on Try that did this
var originalThrowable: Throwable = null
try {
block
} catch {
case t: Throwable =>
// Purposefully not using NonFatal, because even fatal exceptions
// we don't want to have our finallyBlock suppress
originalThrowable = t
throw originalThrowable
} finally {
try {
finallyBlock
} catch {
case t: Throwable =>
if (originalThrowable != null) {
// We could do originalThrowable.addSuppressed(t), but it's
// not available in JDK 1.6.
logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
throw originalThrowable
} else {
throw t
}
}
}
}
/** Default filtering function for finding call sites using `getCallSite`. */
private def coreExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the "core" Spark API that we want to skip when
@ -2074,7 +2112,7 @@ private[spark] class RedirectThread(
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
try {
Utils.tryWithSafeFinally {
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
@ -2082,7 +2120,7 @@ private[spark] class RedirectThread(
out.flush()
len = in.read(buf)
}
} finally {
} {
if (propagateEof) {
out.close()
}

View file

@ -728,25 +728,19 @@ private[spark] class ExternalSorter[K, V, C](
// this simple we spill out the current in-memory collection so that everything is in files.
spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
partitionWriters.foreach(_.commitAndClose())
var out: FileOutputStream = null
var in: FileInputStream = null
val out = new FileOutputStream(outputFile, true)
val writeStartTime = System.nanoTime
try {
out = new FileOutputStream(outputFile, true)
util.Utils.tryWithSafeFinally {
for (i <- 0 until numPartitions) {
in = new FileInputStream(partitionWriters(i).fileSegment().file)
val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
in.close()
in = null
lengths(i) = size
}
} finally {
if (out != null) {
out.close()
}
if (in != null) {
in.close()
val in = new FileInputStream(partitionWriters(i).fileSegment().file)
util.Utils.tryWithSafeFinally {
lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
} {
in.close()
}
}
} {
out.close()
context.taskMetrics.shuffleWriteMetrics.foreach(
_.incShuffleWriteTime(System.nanoTime - writeStartTime))
}