diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index d053958e23..2a246d7808 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -63,17 +63,20 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } private val f: File = createFile(blockId /*, allowAppendExisting */) + private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean // The file channel, used for repositioning / truncating the file. private var channel: FileChannel = null private var bs: OutputStream = null + private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null private var lastValidPosition = 0L private var initialized = false + private var syncTime = 0L override def open(): DiskBlockObjectWriter = { - val fos = new FileOutputStream(f, true) + fos = new FileOutputStream(f, true) ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(ts, bufferSize)) @@ -84,9 +87,19 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def close() { if (initialized) { - objOut.close() + if (syncWrites) { + val start = System.nanoTime() + objOut.flush() + fos.getFD.sync() + objOut.close() + syncTime += System.nanoTime() - start + } else { + objOut.close() + } + channel = null bs = null + fos = null ts = null objOut = null } @@ -132,7 +145,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def size(): Long = lastValidPosition override def timeWriting: Long = { - Option(ts).map(t => t.timeWriting).getOrElse(0L) // ts could be null if never written to + // ts could be null if never written to + Option(ts).map(t => t.timeWriting).getOrElse(0L) + syncTime } }