[SPARK-29322][CORE] Enable closeFrameOnFlush on ZstdOutputStream for event log file

### What changes were proposed in this pull request?

This patch proposes to enable `closeFrameOnFlush` for ZstdOutputStream specific to event logger, so that continuous input stream of zstd is not stuck when reading "inprogress" event log file.

The issue seems to be introduced from [SPARK-26283](https://issues.apache.org/jira/browse/SPARK-26283) which addressed some bug via reading event log file with enabling continuous mode, but it changed the behavior of input stream to read open frame, which seem to wait for frame to be closed. Enabling `closeFrameOnFlush` would close frame whenever flush is called, so input stream could read the frame sooner.

As a pair of `compressedContinuousInputStream`, this patch adds `compressedContinuousOutputStream` which will be only used for event logging.

### Why are the changes needed?

Without this patch, the reader thread in SHS is stuck on reading inprogress event log file compressed with zstd until the application becomes finished.

### Does this PR introduce any user-facing change?

It might bring some overhead on each flush when writing zstd compressed event log, so some sort of performance hit could be introduced. I've restricted the case to only event logging.

### How was this patch tested?

Manually tested, via setting Spark configuration as below:

```
spark.eventLog.enabled                     true
spark.eventLog.compress                  true
spark.eventLog.compression.codec zstd
```

and start Spark application. While the application is running, load the application in SHS webpage.

Before this patch, it may succeed to replay the event log, but high likely it will be stuck and loading page will be also stuck. After this patch, SHS can properly reads the inprogress event log file.

Closes #25996 from HeartSaVioR/SPARK-29322.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-10-02 20:48:38 -07:00 committed by Dongjoon Hyun
parent 51d6ba7490
commit e44d191bf7
4 changed files with 14 additions and 3 deletions

View file

@ -44,6 +44,10 @@ trait CompressionCodec {
def compressedOutputStream(s: OutputStream): OutputStream
private[spark] def compressedContinuousOutputStream(s: OutputStream): OutputStream = {
compressedOutputStream(s)
}
def compressedInputStream(s: InputStream): InputStream
private[spark] def compressedContinuousInputStream(s: InputStream): InputStream = {
@ -220,6 +224,12 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {
new BufferedOutputStream(new ZstdOutputStream(s, level), bufferSize)
}
override private[spark] def compressedContinuousOutputStream(s: OutputStream) = {
// SPARK-29322: Set "closeFrameOnFlush" to 'true' to let continuous input stream not being
// stuck on reading open frame.
new BufferedOutputStream(new ZstdOutputStream(s, level).setCloseFrameOnFlush(true), bufferSize)
}
override def compressedInputStream(s: InputStream): InputStream = {
// Wrap the zstd input stream in a buffered input stream so that we can
// avoid overhead excessive of JNI call while trying to uncompress small amount of data.

View file

@ -126,7 +126,8 @@ private[spark] class EventLoggingListener(
}
try {
val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
val cstream = compressionCodec.map(_.compressedContinuousOutputStream(dstream))
.getOrElse(dstream)
val bstream = new BufferedOutputStream(cstream, outputBufferSize)
EventLoggingListener.initEventLog(bstream, testing, loggedEvents)

View file

@ -1254,7 +1254,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
private def writeFile(file: File, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val fstream = new FileOutputStream(file)
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
val cstream = codec.map(_.compressedContinuousOutputStream(fstream)).getOrElse(fstream)
val bstream = new BufferedOutputStream(cstream)
EventLoggingListener.initEventLog(bstream, false, null)

View file

@ -89,7 +89,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
test("Replay compressed inprogress log file succeeding on partial read") {
val buffered = new ByteArrayOutputStream
val codec = new LZ4CompressionCodec(new SparkConf())
val compstream = codec.compressedOutputStream(buffered)
val compstream = codec.compressedContinuousOutputStream(buffered)
val cwriter = new OutputStreamWriter(compstream, StandardCharsets.UTF_8)
Utils.tryWithResource(new PrintWriter(cwriter)) { writer =>