From 09a1ddba41519f6aeb2241cae4d95a3bd194301d Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Fri, 13 Aug 2021 19:25:20 +0900 Subject: [PATCH] [SPARK-36500][CORE] Fix temp_shuffle file leaking when a task is interrupted ### What changes were proposed in this pull request? When a task thread is interrupted, the underlying output stream referred by `DiskBlockObjectWriter.mcs` may have been closed, then we get IOException when flushing the buffered data. This breaks the assumption that `revertPartialWritesAndClose()` should not throw exceptions. To fix the issue, we can catch the IOException in `ManualCloseOutputStream.manualClose()`. ### Why are the changes needed? Previously the IOException was not captured, thus `revertPartialWritesAndClose()` threw an exception. When this happens, `BypassMergeSortShuffleWriter.stop()` would stop deleting the temp_shuffle files tracked by `partitionWriters`, hens lead to temp_shuffle file leak issues. ### Does this PR introduce _any_ user-facing change? No, this is an internal bug fix. ### How was this patch tested? Tested by running a longevity stress test. After the fix, there is no more leaked temp_shuffle files. Closes #33731 from jiangxb1987/temp_shuffle. Authored-by: Xingbo Jiang Signed-off-by: Hyukjin Kwon (cherry picked from commit ec5f3a17e33f7afe03e48f8b7690a8b18ae0c058) Signed-off-by: Hyukjin Kwon --- .../spark/storage/DiskBlockObjectWriter.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index f5d8c0219d..662f63db54 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream} +import java.io.{BufferedOutputStream, File, FileOutputStream, IOException, OutputStream} import java.nio.channels.{ClosedByInterruptException, FileChannel} import java.util.zip.Checksum @@ -64,7 +64,16 @@ private[spark] class DiskBlockObjectWriter( } def manualClose(): Unit = { - super.close() + try { + super.close() + } catch { + // The output stream may have been closed when the task thread is interrupted, then we + // get IOException when flushing the buffered data. We should catch and log the exception + // to ensure the revertPartialWritesAndClose() function doesn't throw an exception. + case e: IOException => + logError("Exception occurred while manually close the output stream to file " + + file + ", " + e.getMessage) + } } }