diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index e2b24298a5..173c3291b1 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ +import org.apache.spark.Logging import org.apache.spark.serializer.Serializer -import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} -import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup +import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} +import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} /** A group of writers for a ShuffleMapTask, one writer per reducer. */ private[spark] trait ShuffleWriterGroup { @@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup { * files within a ShuffleFileGroups associated with the block's reducer. */ private[spark] -class ShuffleBlockManager(blockManager: BlockManager) { +class ShuffleBlockManager(blockManager: BlockManager) extends Logging { def conf = blockManager.conf // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. @@ -106,6 +107,12 @@ class ShuffleBlockManager(blockManager: BlockManager) { Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockFile = blockManager.diskBlockManager.getFile(blockId) + // Because of previous failures, the shuffle file may already exist on this machine. + // If so, remove it. + if (blockFile.exists()) { + val removed = blockFile.delete() + logInfo(s"Removed existing shuffle file $blockFile successfully: $removed") + } blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) } }