[SPARK-21444] Be more defensive when removing broadcasts in MapOutputTracker

## What changes were proposed in this pull request?

In SPARK-21444, sitalkedia reported an issue where the `Broadcast.destroy()` call in `MapOutputTracker`'s `ShuffleStatus.invalidateSerializedMapOutputStatusCache()` was failing with an `IOException`, causing the DAGScheduler to crash and bring down the entire driver.

This is a bug introduced by #17955. In the old code, we removed a broadcast variable by calling `BroadcastManager.unbroadcast` with `blocking=false`, but the new code simply calls `Broadcast.destroy()` which is capable of failing with an IOException in case certain blocking RPCs time out.

The fix implemented here is to replace this with a call to `destroy(blocking = false)` and to wrap the entire operation in `Utils.tryLogNonFatalError`.

## How was this patch tested?

I haven't written regression tests for this because it's really hard to inject mocks to simulate RPC failures here. Instead, this class of issue is probably best uncovered with more generalized error injection / network unreliability / fuzz testing tools.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #18662 from JoshRosen/SPARK-21444.
This commit is contained in:
Josh Rosen 2017-07-17 20:40:32 -07:00
parent e9faae135c
commit 5952ad2b40

View file

@ -194,7 +194,12 @@ private class ShuffleStatus(numPartitions: Int) {
*/ */
def invalidateSerializedMapOutputStatusCache(): Unit = synchronized { def invalidateSerializedMapOutputStatusCache(): Unit = synchronized {
if (cachedSerializedBroadcast != null) { if (cachedSerializedBroadcast != null) {
cachedSerializedBroadcast.destroy() // Prevent errors during broadcast cleanup from crashing the DAGScheduler (see SPARK-21444)
Utils.tryLogNonFatalError {
// Use `blocking = false` so that this operation doesn't hang while trying to send cleanup
// RPCs to dead executors.
cachedSerializedBroadcast.destroy(blocking = false)
}
cachedSerializedBroadcast = null cachedSerializedBroadcast = null
} }
cachedSerializedMapStatus = null cachedSerializedMapStatus = null