[SPARK-6560][CORE] Do not suppress exceptions from writer.write.
If there is a failure in the Hadoop backend while calling writer.write, we should remember this original exception, and try to call writer.close(), but if that fails as well, still report the original exception. Note that, if writer.write fails, it is likely that writer was left in an invalid state, and so actually makes it more likely that writer.close will also fail. Which just increases the chances for writer.write's exception to be suppressed. This patch introduces an admittedly potentially too cute Utils.tryWithSafeFinally method to handle the try/finally gyrations. Author: Stephen Haberman <stephen@exigencecorp.com> Closes #5223 from stephenh/do_not_suppress_writer_exception and squashes the following commits: c7ad53f [Stephen Haberman] [SPARK-6560][CORE] Do not suppress exceptions from writer.write.
This commit is contained in:
parent
82701ee25f
commit
b0d884f044
|
@ -356,11 +356,14 @@ private[spark] object MapOutputTracker extends Logging {
|
|||
def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
|
||||
val out = new ByteArrayOutputStream
|
||||
val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
|
||||
Utils.tryWithSafeFinally {
|
||||
// Since statuses can be modified in parallel, sync on it
|
||||
statuses.synchronized {
|
||||
objOut.writeObject(statuses)
|
||||
}
|
||||
} {
|
||||
objOut.close()
|
||||
}
|
||||
out.toByteArray
|
||||
}
|
||||
|
||||
|
|
|
@ -614,9 +614,9 @@ private[spark] object PythonRDD extends Logging {
|
|||
try {
|
||||
val sock = serverSocket.accept()
|
||||
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
|
||||
try {
|
||||
Utils.tryWithSafeFinally {
|
||||
writeIteratorToStream(items, out)
|
||||
} finally {
|
||||
} {
|
||||
out.close()
|
||||
}
|
||||
} catch {
|
||||
|
@ -862,9 +862,9 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
|
|||
val file = File.createTempFile("broadcast", "", dir)
|
||||
path = file.getAbsolutePath
|
||||
val out = new FileOutputStream(file)
|
||||
try {
|
||||
Utils.tryWithSafeFinally {
|
||||
Utils.copyStream(in, out)
|
||||
} finally {
|
||||
} {
|
||||
out.close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -165,7 +165,7 @@ private[broadcast] object HttpBroadcast extends Logging {
|
|||
private def write(id: Long, value: Any) {
|
||||
val file = getFile(id)
|
||||
val fileOutputStream = new FileOutputStream(file)
|
||||
try {
|
||||
Utils.tryWithSafeFinally {
|
||||
val out: OutputStream = {
|
||||
if (compress) {
|
||||
compressionCodec.compressedOutputStream(fileOutputStream)
|
||||
|
@ -175,10 +175,13 @@ private[broadcast] object HttpBroadcast extends Logging {
|
|||
}
|
||||
val ser = SparkEnv.get.serializer.newInstance()
|
||||
val serOut = ser.serializeStream(out)
|
||||
Utils.tryWithSafeFinally {
|
||||
serOut.writeObject(value)
|
||||
} {
|
||||
serOut.close()
|
||||
}
|
||||
files += file
|
||||
} finally {
|
||||
} {
|
||||
fileOutputStream.close()
|
||||
}
|
||||
}
|
||||
|
@ -212,9 +215,11 @@ private[broadcast] object HttpBroadcast extends Logging {
|
|||
}
|
||||
val ser = SparkEnv.get.serializer.newInstance()
|
||||
val serIn = ser.deserializeStream(in)
|
||||
val obj = serIn.readObject[T]()
|
||||
Utils.tryWithSafeFinally {
|
||||
serIn.readObject[T]()
|
||||
} {
|
||||
serIn.close()
|
||||
obj
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -24,6 +24,7 @@ import scala.reflect.ClassTag
|
|||
import akka.serialization.Serialization
|
||||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
||||
/**
|
||||
|
@ -59,9 +60,9 @@ private[master] class FileSystemPersistenceEngine(
|
|||
val serializer = serialization.findSerializerFor(value)
|
||||
val serialized = serializer.toBinary(value)
|
||||
val out = new FileOutputStream(file)
|
||||
try {
|
||||
Utils.tryWithSafeFinally {
|
||||
out.write(serialized)
|
||||
} finally {
|
||||
} {
|
||||
out.close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException
|
|||
import com.google.common.base.Charsets
|
||||
|
||||
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
* A client that submits applications to the standalone Master using a REST protocol.
|
||||
|
@ -148,8 +149,11 @@ private[deploy] class StandaloneRestClient extends Logging {
|
|||
conn.setRequestProperty("charset", "utf-8")
|
||||
conn.setDoOutput(true)
|
||||
val out = new DataOutputStream(conn.getOutputStream)
|
||||
Utils.tryWithSafeFinally {
|
||||
out.write(json.getBytes(Charsets.UTF_8))
|
||||
} {
|
||||
out.close()
|
||||
}
|
||||
readResponse(conn)
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
|
|||
import org.apache.spark._
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
|
||||
|
||||
|
@ -112,8 +113,11 @@ private[spark] object CheckpointRDD extends Logging {
|
|||
}
|
||||
val serializer = env.serializer.newInstance()
|
||||
val serializeStream = serializer.serializeStream(fileOutputStream)
|
||||
Utils.tryWithSafeFinally {
|
||||
serializeStream.writeAll(iterator)
|
||||
} {
|
||||
serializeStream.close()
|
||||
}
|
||||
|
||||
if (!fs.rename(tempOutputPath, finalOutputPath)) {
|
||||
if (!fs.exists(finalOutputPath)) {
|
||||
|
|
|
@ -995,7 +995,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
|
||||
require(writer != null, "Unable to obtain RecordWriter")
|
||||
var recordsWritten = 0L
|
||||
try {
|
||||
Utils.tryWithSafeFinally {
|
||||
while (iter.hasNext) {
|
||||
val pair = iter.next()
|
||||
writer.write(pair._1, pair._2)
|
||||
|
@ -1004,7 +1004,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
|
||||
recordsWritten += 1
|
||||
}
|
||||
} finally {
|
||||
} {
|
||||
writer.close(hadoopContext)
|
||||
}
|
||||
committer.commitTask(hadoopContext)
|
||||
|
@ -1068,7 +1068,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
writer.setup(context.stageId, context.partitionId, taskAttemptId)
|
||||
writer.open()
|
||||
var recordsWritten = 0L
|
||||
try {
|
||||
|
||||
Utils.tryWithSafeFinally {
|
||||
while (iter.hasNext) {
|
||||
val record = iter.next()
|
||||
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
|
||||
|
@ -1077,7 +1078,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
|
|||
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
|
||||
recordsWritten += 1
|
||||
}
|
||||
} finally {
|
||||
} {
|
||||
writer.close()
|
||||
}
|
||||
writer.commit()
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkEnv}
|
|||
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
|
||||
import org.apache.spark.network.netty.SparkTransportConf
|
||||
import org.apache.spark.storage._
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
import IndexShuffleBlockManager.NOOP_REDUCE_ID
|
||||
|
||||
|
@ -78,16 +79,15 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
|
|||
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
|
||||
val indexFile = getIndexFile(shuffleId, mapId)
|
||||
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
|
||||
try {
|
||||
Utils.tryWithSafeFinally {
|
||||
// We take in lengths of each block, need to convert it to offsets.
|
||||
var offset = 0L
|
||||
out.writeLong(offset)
|
||||
|
||||
for (length <- lengths) {
|
||||
offset += length
|
||||
out.writeLong(offset)
|
||||
}
|
||||
} finally {
|
||||
} {
|
||||
out.close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.nio.channels.FileChannel
|
|||
import org.apache.spark.Logging
|
||||
import org.apache.spark.serializer.{SerializationStream, Serializer}
|
||||
import org.apache.spark.executor.ShuffleWriteMetrics
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
* An interface for writing JVM objects to some underlying storage. This interface allows
|
||||
|
@ -140,6 +141,7 @@ private[spark] class DiskBlockObjectWriter(
|
|||
|
||||
override def close() {
|
||||
if (initialized) {
|
||||
Utils.tryWithSafeFinally {
|
||||
if (syncWrites) {
|
||||
// Force outstanding writes to disk and track how long it takes
|
||||
objOut.flush()
|
||||
|
@ -147,7 +149,9 @@ private[spark] class DiskBlockObjectWriter(
|
|||
fos.getFD.sync()
|
||||
}
|
||||
}
|
||||
} {
|
||||
objOut.close()
|
||||
}
|
||||
|
||||
channel = null
|
||||
bs = null
|
||||
|
|
|
@ -46,10 +46,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
|
|||
val startTime = System.currentTimeMillis
|
||||
val file = diskManager.getFile(blockId)
|
||||
val channel = new FileOutputStream(file).getChannel
|
||||
Utils.tryWithSafeFinally {
|
||||
while (bytes.remaining > 0) {
|
||||
channel.write(bytes)
|
||||
}
|
||||
} {
|
||||
channel.close()
|
||||
}
|
||||
val finishTime = System.currentTimeMillis
|
||||
logDebug("Block %s stored as %s file on disk in %d ms".format(
|
||||
file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
|
||||
|
@ -75,9 +78,9 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
|
|||
val file = diskManager.getFile(blockId)
|
||||
val outputStream = new FileOutputStream(file)
|
||||
try {
|
||||
try {
|
||||
Utils.tryWithSafeFinally {
|
||||
blockManager.dataSerializeStream(blockId, outputStream, values)
|
||||
} finally {
|
||||
} {
|
||||
// Close outputStream here because it should be closed before file is deleted.
|
||||
outputStream.close()
|
||||
}
|
||||
|
@ -106,8 +109,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
|
|||
|
||||
private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
|
||||
val channel = new RandomAccessFile(file, "r").getChannel
|
||||
|
||||
try {
|
||||
Utils.tryWithSafeFinally {
|
||||
// For small files, directly read rather than memory map
|
||||
if (length < minMemoryMapBytes) {
|
||||
val buf = ByteBuffer.allocate(length.toInt)
|
||||
|
@ -123,7 +125,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
|
|||
} else {
|
||||
Some(channel.map(MapMode.READ_ONLY, offset, length))
|
||||
}
|
||||
} finally {
|
||||
} {
|
||||
channel.close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -313,7 +313,7 @@ private[spark] object Utils extends Logging {
|
|||
transferToEnabled: Boolean = false): Long =
|
||||
{
|
||||
var count = 0L
|
||||
try {
|
||||
tryWithSafeFinally {
|
||||
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
|
||||
&& transferToEnabled) {
|
||||
// When both streams are File stream, use transferTo to improve copy performance.
|
||||
|
@ -353,7 +353,7 @@ private[spark] object Utils extends Logging {
|
|||
}
|
||||
}
|
||||
count
|
||||
} finally {
|
||||
} {
|
||||
if (closeStreams) {
|
||||
try {
|
||||
in.close()
|
||||
|
@ -1214,6 +1214,44 @@ private[spark] object Utils extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a block of code, then a finally block, but if exceptions happen in
|
||||
* the finally block, do not suppress the original exception.
|
||||
*
|
||||
* This is primarily an issue with `finally { out.close() }` blocks, where
|
||||
* close needs to be called to clean up `out`, but if an exception happened
|
||||
* in `out.write`, it's likely `out` may be corrupted and `out.close` will
|
||||
* fail as well. This would then suppress the original/likely more meaningful
|
||||
* exception from the original `out.write` call.
|
||||
*/
|
||||
def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
|
||||
// It would be nice to find a method on Try that did this
|
||||
var originalThrowable: Throwable = null
|
||||
try {
|
||||
block
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
// Purposefully not using NonFatal, because even fatal exceptions
|
||||
// we don't want to have our finallyBlock suppress
|
||||
originalThrowable = t
|
||||
throw originalThrowable
|
||||
} finally {
|
||||
try {
|
||||
finallyBlock
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
if (originalThrowable != null) {
|
||||
// We could do originalThrowable.addSuppressed(t), but it's
|
||||
// not available in JDK 1.6.
|
||||
logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
|
||||
throw originalThrowable
|
||||
} else {
|
||||
throw t
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Default filtering function for finding call sites using `getCallSite`. */
|
||||
private def coreExclusionFunction(className: String): Boolean = {
|
||||
// A regular expression to match classes of the "core" Spark API that we want to skip when
|
||||
|
@ -2074,7 +2112,7 @@ private[spark] class RedirectThread(
|
|||
override def run() {
|
||||
scala.util.control.Exception.ignoring(classOf[IOException]) {
|
||||
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
|
||||
try {
|
||||
Utils.tryWithSafeFinally {
|
||||
val buf = new Array[Byte](1024)
|
||||
var len = in.read(buf)
|
||||
while (len != -1) {
|
||||
|
@ -2082,7 +2120,7 @@ private[spark] class RedirectThread(
|
|||
out.flush()
|
||||
len = in.read(buf)
|
||||
}
|
||||
} finally {
|
||||
} {
|
||||
if (propagateEof) {
|
||||
out.close()
|
||||
}
|
||||
|
|
|
@ -728,25 +728,19 @@ private[spark] class ExternalSorter[K, V, C](
|
|||
// this simple we spill out the current in-memory collection so that everything is in files.
|
||||
spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
|
||||
partitionWriters.foreach(_.commitAndClose())
|
||||
var out: FileOutputStream = null
|
||||
var in: FileInputStream = null
|
||||
val out = new FileOutputStream(outputFile, true)
|
||||
val writeStartTime = System.nanoTime
|
||||
try {
|
||||
out = new FileOutputStream(outputFile, true)
|
||||
util.Utils.tryWithSafeFinally {
|
||||
for (i <- 0 until numPartitions) {
|
||||
in = new FileInputStream(partitionWriters(i).fileSegment().file)
|
||||
val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
|
||||
val in = new FileInputStream(partitionWriters(i).fileSegment().file)
|
||||
util.Utils.tryWithSafeFinally {
|
||||
lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
|
||||
} {
|
||||
in.close()
|
||||
in = null
|
||||
lengths(i) = size
|
||||
}
|
||||
} finally {
|
||||
if (out != null) {
|
||||
}
|
||||
} {
|
||||
out.close()
|
||||
}
|
||||
if (in != null) {
|
||||
in.close()
|
||||
}
|
||||
context.taskMetrics.shuffleWriteMetrics.foreach(
|
||||
_.incShuffleWriteTime(System.nanoTime - writeStartTime))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue