From e44d191bf74dece6e64b0189f48e089f57a90204 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 2 Oct 2019 20:48:38 -0700 Subject: [PATCH] [SPARK-29322][CORE] Enable closeFrameOnFlush on ZstdOutputStream for event log file ### What changes were proposed in this pull request? This patch proposes to enable `closeFrameOnFlush` for ZstdOutputStream specific to event logger, so that continuous input stream of zstd is not stuck when reading "inprogress" event log file. The issue seems to be introduced from [SPARK-26283](https://issues.apache.org/jira/browse/SPARK-26283) which addressed some bug via reading event log file with enabling continuous mode, but it changed the behavior of input stream to read open frame, which seem to wait for frame to be closed. Enabling `closeFrameOnFlush` would close frame whenever flush is called, so input stream could read the frame sooner. As a pair of `compressedContinuousInputStream`, this patch adds `compressedContinuousOutputStream` which will be only used for event logging. ### Why are the changes needed? Without this patch, the reader thread in SHS is stuck on reading inprogress event log file compressed with zstd until the application becomes finished. ### Does this PR introduce any user-facing change? It might bring some overhead on each flush when writing zstd compressed event log, so some sort of performance hit could be introduced. I've restricted the case to only event logging. ### How was this patch tested? Manually tested, via setting Spark configuration as below: ``` spark.eventLog.enabled true spark.eventLog.compress true spark.eventLog.compression.codec zstd ``` and start Spark application. While the application is running, load the application in SHS webpage. Before this patch, it may succeed to replay the event log, but high likely it will be stuck and loading page will be also stuck. After this patch, SHS can properly reads the inprogress event log file. Closes #25996 from HeartSaVioR/SPARK-29322. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/io/CompressionCodec.scala | 10 ++++++++++ .../apache/spark/scheduler/EventLoggingListener.scala | 3 ++- .../spark/deploy/history/FsHistoryProviderSuite.scala | 2 +- .../apache/spark/scheduler/ReplayListenerSuite.scala | 2 +- 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index adbd59c9f0..5205a2d568 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -44,6 +44,10 @@ trait CompressionCodec { def compressedOutputStream(s: OutputStream): OutputStream + private[spark] def compressedContinuousOutputStream(s: OutputStream): OutputStream = { + compressedOutputStream(s) + } + def compressedInputStream(s: InputStream): InputStream private[spark] def compressedContinuousInputStream(s: InputStream): InputStream = { @@ -220,6 +224,12 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { new BufferedOutputStream(new ZstdOutputStream(s, level), bufferSize) } + override private[spark] def compressedContinuousOutputStream(s: OutputStream) = { + // SPARK-29322: Set "closeFrameOnFlush" to 'true' to let continuous input stream not being + // stuck on reading open frame. + new BufferedOutputStream(new ZstdOutputStream(s, level).setCloseFrameOnFlush(true), bufferSize) + } + override def compressedInputStream(s: InputStream): InputStream = { // Wrap the zstd input stream in a buffered input stream so that we can // avoid overhead excessive of JNI call while trying to uncompress small amount of data. diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 0c0259f12b..a0a4428dc7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -126,7 +126,8 @@ private[spark] class EventLoggingListener( } try { - val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val cstream = compressionCodec.map(_.compressedContinuousOutputStream(dstream)) + .getOrElse(dstream) val bstream = new BufferedOutputStream(cstream, outputBufferSize) EventLoggingListener.initEventLog(bstream, testing, loggedEvents) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 9a2198e510..1d465ba373 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1254,7 +1254,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { private def writeFile(file: File, codec: Option[CompressionCodec], events: SparkListenerEvent*) = { val fstream = new FileOutputStream(file) - val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) + val cstream = codec.map(_.compressedContinuousOutputStream(fstream)).getOrElse(fstream) val bstream = new BufferedOutputStream(cstream) EventLoggingListener.initEventLog(bstream, false, null) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 080b4afb55..55e7f5333c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -89,7 +89,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp test("Replay compressed inprogress log file succeeding on partial read") { val buffered = new ByteArrayOutputStream val codec = new LZ4CompressionCodec(new SparkConf()) - val compstream = codec.compressedOutputStream(buffered) + val compstream = codec.compressedContinuousOutputStream(buffered) val cwriter = new OutputStreamWriter(compstream, StandardCharsets.UTF_8) Utils.tryWithResource(new PrintWriter(cwriter)) { writer =>