Adding option to force sync to the filesystem
This commit is contained in:
parent
3478ca6762
commit
a224c8c9b8
|
@ -63,17 +63,20 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
||||||
}
|
}
|
||||||
|
|
||||||
private val f: File = createFile(blockId /*, allowAppendExisting */)
|
private val f: File = createFile(blockId /*, allowAppendExisting */)
|
||||||
|
private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean
|
||||||
|
|
||||||
// The file channel, used for repositioning / truncating the file.
|
// The file channel, used for repositioning / truncating the file.
|
||||||
private var channel: FileChannel = null
|
private var channel: FileChannel = null
|
||||||
private var bs: OutputStream = null
|
private var bs: OutputStream = null
|
||||||
|
private var fos: FileOutputStream = null
|
||||||
private var ts: TimeTrackingOutputStream = null
|
private var ts: TimeTrackingOutputStream = null
|
||||||
private var objOut: SerializationStream = null
|
private var objOut: SerializationStream = null
|
||||||
private var lastValidPosition = 0L
|
private var lastValidPosition = 0L
|
||||||
private var initialized = false
|
private var initialized = false
|
||||||
|
private var syncTime = 0L
|
||||||
|
|
||||||
override def open(): DiskBlockObjectWriter = {
|
override def open(): DiskBlockObjectWriter = {
|
||||||
val fos = new FileOutputStream(f, true)
|
fos = new FileOutputStream(f, true)
|
||||||
ts = new TimeTrackingOutputStream(fos)
|
ts = new TimeTrackingOutputStream(fos)
|
||||||
channel = fos.getChannel()
|
channel = fos.getChannel()
|
||||||
bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(ts, bufferSize))
|
bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(ts, bufferSize))
|
||||||
|
@ -84,9 +87,19 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
||||||
|
|
||||||
override def close() {
|
override def close() {
|
||||||
if (initialized) {
|
if (initialized) {
|
||||||
objOut.close()
|
if (syncWrites) {
|
||||||
|
val start = System.nanoTime()
|
||||||
|
objOut.flush()
|
||||||
|
fos.getFD.sync()
|
||||||
|
objOut.close()
|
||||||
|
syncTime += System.nanoTime() - start
|
||||||
|
} else {
|
||||||
|
objOut.close()
|
||||||
|
}
|
||||||
|
|
||||||
channel = null
|
channel = null
|
||||||
bs = null
|
bs = null
|
||||||
|
fos = null
|
||||||
ts = null
|
ts = null
|
||||||
objOut = null
|
objOut = null
|
||||||
}
|
}
|
||||||
|
@ -132,7 +145,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
||||||
override def size(): Long = lastValidPosition
|
override def size(): Long = lastValidPosition
|
||||||
|
|
||||||
override def timeWriting: Long = {
|
override def timeWriting: Long = {
|
||||||
Option(ts).map(t => t.timeWriting).getOrElse(0L) // ts could be null if never written to
|
// ts could be null if never written to
|
||||||
|
Option(ts).map(t => t.timeWriting).getOrElse(0L) + syncTime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue