[SPARK-36255][SHUFFLE][CORE] Stop pushing and retrying on FileNotFound exceptions

### What changes were proposed in this pull request?
Once the shuffle is cleaned up by the `ContextCleaner`, the shuffle files are deleted by the executors. In this case, the push of the shuffle data by the executors can throw `FileNotFoundException`s because the shuffle files are deleted. When this exception is thrown from the `shuffle-block-push-thread`, it causes the executor to exit. Both the `shuffle-block-push` threads and the netty event-loops will encounter `FileNotFoundException`s in this case.  The fix here stops these threads from pushing more blocks when they encounter `FileNotFoundException`. When the exception is from the `shuffle-block-push-thread`, it will get handled and logged as warning instead of failing the executor.

### Why are the changes needed?
This fixes the bug which causes executor to exits when they are instructed to clean up shuffle data.
Below is the stacktrace of this exception:
```
21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[block-push-thread-1,5,main]
java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer

{file=********/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190}
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error in opening FileSegmentManagedBuffer\{file=*******/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190}

at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
at org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
at org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
Caused by: java.io.FileNotFoundException: ******/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243)
at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added a unit to verify no more data is pushed when `FileNotFoundException` is encountered. Have also verified in our environment.

Closes #33477 from otterc/SPARK-36255.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
This commit is contained in:
Chandni Singh 2021-07-24 21:09:11 +08:00 committed by yi.wu
parent ae1c20ee0d
commit 09e1c61272
3 changed files with 56 additions and 8 deletions

View file

@ -17,6 +17,7 @@
package org.apache.spark.network.shuffle;
import java.io.FileNotFoundException;
import java.net.ConnectException;
import com.google.common.base.Throwables;
@ -82,8 +83,12 @@ public interface ErrorHandler {
@Override
public boolean shouldRetryError(Throwable t) {
// If it is a connection time out or a connection closed exception, no need to retry.
if (t.getCause() != null && t.getCause() instanceof ConnectException) {
// If it is a connection time-out or a connection closed exception, no need to retry.
// If it is a FileNotFoundException originating from the client while pushing the shuffle
// blocks to the server, even then there is no need to retry. We will still log this exception
// once which helps with debugging.
if (t.getCause() != null && (t.getCause() instanceof ConnectException ||
t.getCause() instanceof FileNotFoundException)) {
return false;
}
// If the block is too late, there is no need to retry it

View file

@ -17,7 +17,7 @@
package org.apache.spark.shuffle
import java.io.File
import java.io.{File, FileNotFoundException}
import java.net.ConnectException
import java.nio.ByteBuffer
import java.util.concurrent.ExecutorService
@ -71,6 +71,12 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
// blocks to just that host and continue push blocks to other hosts. So, here push of
// all blocks will only stop when it is "Too Late". Also see updateStateAndCheckIfPushMore.
override def shouldRetryError(t: Throwable): Boolean = {
// If it is a FileNotFoundException originating from the client while pushing the shuffle
// blocks to the server, then we stop pushing all the blocks because this indicates the
// shuffle files are deleted and subsequent block push will also fail.
if (t.getCause != null && t.getCause.isInstanceOf[FileNotFoundException]) {
return false
}
// If the block is too late, there is no need to retry it
!Throwables.getStackTraceAsString(t).contains(BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX)
}
@ -100,10 +106,22 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
pushRequests ++= Utils.randomize(requests)
submitTask(() => {
pushUpToMax()
tryPushUpToMax()
})
}
private[shuffle] def tryPushUpToMax(): Unit = {
try {
pushUpToMax()
} catch {
case e: FileNotFoundException =>
logWarning("The shuffle files got deleted when this shuffle-block-push-thread " +
"was reading from them which could happen when the job finishes and the driver " +
"instructs the executor to cleanup the shuffle. In this case, push of the blocks " +
"belonging to this shuffle will stop.", e)
}
}
/**
* Triggers the push. It's a separate method for testing.
* VisibleForTesting
@ -201,7 +219,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
submitTask(() => {
if (updateStateAndCheckIfPushMore(
sizeMap(result.blockId), address, remainingBlocks, result)) {
pushUpToMax()
tryPushUpToMax()
}
})
}
@ -297,7 +315,8 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
}
}
if (pushResult.failure != null && !errorHandler.shouldRetryError(pushResult.failure)) {
logDebug(s"Received after merge is finalized from $address. Not pushing any more blocks.")
logDebug(s"Encountered an exception from $address which indicates that push needs to " +
s"stop.")
return false
} else {
remainingBlocks.isEmpty && (pushRequests.nonEmpty || deferredPushRequests.nonEmpty)

View file

@ -17,7 +17,7 @@
package org.apache.spark.shuffle
import java.io.File
import java.io.{File, FileNotFoundException, IOException}
import java.net.ConnectException
import java.nio.ByteBuffer
import java.util.concurrent.LinkedBlockingQueue
@ -324,8 +324,32 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(pusher.unreachableBlockMgrs.size == 2)
}
test("SPARK-36255: FileNotFoundException stops the push") {
when(dependency.getMergerLocs).thenReturn(
Seq(BlockManagerId("client1", "client1", 1), BlockManagerId("client2", "client2", 2)))
conf.set("spark.reducer.maxReqsInFlight", "1")
val pusher = new TestShuffleBlockPusher(conf)
when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
.thenAnswer((invocation: InvocationOnMock) => {
val pushedBlocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
val blockFetchListener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
pushedBlocks.foreach(blockId => {
blockFetchListener.onBlockFetchFailure(
blockId, new IOException("Failed to send RPC",
new FileNotFoundException("file not found")))
})
})
pusher.initiateBlockPush(
mock(classOf[File]), Array.fill(dependency.partitioner.numPartitions) { 2 }, dependency, 0)
pusher.runPendingTasks()
verify(shuffleClient, times(1))
.pushBlocks(any(), any(), any(), any(), any())
assert(pusher.tasks.isEmpty)
ShuffleBlockPusher.stop()
}
private class TestShuffleBlockPusher(conf: SparkConf) extends ShuffleBlockPusher(conf) {
private[this] val tasks = new LinkedBlockingQueue[Runnable]
val tasks = new LinkedBlockingQueue[Runnable]
override protected def submitTask(task: Runnable): Unit = {
tasks.add(task)