Remove shuffle files if they are still present on a machine.

This commit is contained in:
Patrick Wendell 2014-01-20 19:11:22 -08:00
parent f84400e86c
commit de526ad527

View file

@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup {
* files within a ShuffleFileGroups associated with the block's reducer.
*/
private[spark]
class ShuffleBlockManager(blockManager: BlockManager) {
class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
def conf = blockManager.conf
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@ -106,6 +107,12 @@ class ShuffleBlockManager(blockManager: BlockManager) {
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
if (blockFile.exists()) {
val removed = blockFile.delete()
logInfo(s"Removed existing shuffle file $blockFile successfully: $removed")
}
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
}
}