[SPARK-36500][CORE] Fix temp_shuffle file leaking when a task is interrupted

### What changes were proposed in this pull request?

When a task thread is interrupted, the underlying output stream referred by `DiskBlockObjectWriter.mcs` may have been closed, then we get IOException when flushing the buffered data. This breaks the assumption that `revertPartialWritesAndClose()` should not throw exceptions.

To fix the issue, we can catch the IOException in `ManualCloseOutputStream.manualClose()`.

### Why are the changes needed?

Previously the IOException was not captured, thus `revertPartialWritesAndClose()` threw an exception. When this happens, `BypassMergeSortShuffleWriter.stop()` would stop deleting the temp_shuffle files tracked by `partitionWriters`, hens lead to temp_shuffle file leak issues.

### Does this PR introduce _any_ user-facing change?

No, this is an internal bug fix.

### How was this patch tested?

Tested by running a longevity stress test. After the fix, there is no more leaked temp_shuffle files.

Closes #33731 from jiangxb1987/temp_shuffle.

Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit ec5f3a17e3)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
Xingbo Jiang 2021-08-13 19:25:20 +09:00 committed by Hyukjin Kwon
parent eaf92bea99
commit 09a1ddba41

View file

@ -17,7 +17,7 @@
package org.apache.spark.storage
import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
import java.io.{BufferedOutputStream, File, FileOutputStream, IOException, OutputStream}
import java.nio.channels.{ClosedByInterruptException, FileChannel}
import java.util.zip.Checksum
@ -64,7 +64,16 @@ private[spark] class DiskBlockObjectWriter(
}
def manualClose(): Unit = {
super.close()
try {
super.close()
} catch {
// The output stream may have been closed when the task thread is interrupted, then we
// get IOException when flushing the buffered data. We should catch and log the exception
// to ensure the revertPartialWritesAndClose() function doesn't throw an exception.
case e: IOException =>
logError("Exception occurred while manually close the output stream to file "
+ file + ", " + e.getMessage)
}
}
}