[SPARK-17371] Resubmitted shuffle outputs can get deleted by zombie map tasks

## What changes were proposed in this pull request?

It seems that old shuffle map tasks hanging around after a stage resubmit will delete intended shuffle output files on stop(), causing downstream stages to fail even after successful resubmit completion. This can happen easily if the prior map task is waiting for a network timeout when its stage is resubmitted.

This can cause unnecessary stage resubmits, sometimes multiple times as fetch fails cause a cascade of shuffle file invalidations, and confusing FetchFailure messages that report shuffle index files missing from the local disk.

Given that IndexShuffleBlockResolver commits data atomically, it seems unnecessary to ever delete committed task output: even in the rare case that a task is failed after it finishes committing shuffle output, it should be safe to retain that output.

## How was this patch tested?

Prior to the fix proposed in https://github.com/apache/spark/pull/14931, I was able to reproduce this behavior by killing slaves in the middle of a large shuffle. After this patch, stages were no longer resubmitted multiple times due to shuffle index loss.

cc JoshRosen vanzin

Author: Eric Liang <ekl@databricks.com>

Closes #14932 from ericl/dont-remove-committed-files.
This commit is contained in:
Eric Liang 2016-09-06 16:55:22 -07:00 committed by Josh Rosen
parent 175b434411
commit c07cbb3534
3 changed files with 0 additions and 5 deletions

View file

@ -238,7 +238,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
partitionWriters = null;
}
}
shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
return None$.empty();
}
}

View file

@ -465,8 +465,6 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
}
return Option.apply(mapStatus);
} else {
// The map task failed, so delete our output data.
shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
return Option.apply(null);
}
}

View file

@ -83,8 +83,6 @@ private[spark] class SortShuffleWriter[K, V, C](
if (success) {
return Option(mapStatus)
} else {
// The map task failed, so delete our output data.
shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
return None
}
} finally {