diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index adbd59c9f0..5205a2d568 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -44,6 +44,10 @@ trait CompressionCodec { def compressedOutputStream(s: OutputStream): OutputStream + private[spark] def compressedContinuousOutputStream(s: OutputStream): OutputStream = { + compressedOutputStream(s) + } + def compressedInputStream(s: InputStream): InputStream private[spark] def compressedContinuousInputStream(s: InputStream): InputStream = { @@ -220,6 +224,12 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { new BufferedOutputStream(new ZstdOutputStream(s, level), bufferSize) } + override private[spark] def compressedContinuousOutputStream(s: OutputStream) = { + // SPARK-29322: Set "closeFrameOnFlush" to 'true' to let continuous input stream not being + // stuck on reading open frame. + new BufferedOutputStream(new ZstdOutputStream(s, level).setCloseFrameOnFlush(true), bufferSize) + } + override def compressedInputStream(s: InputStream): InputStream = { // Wrap the zstd input stream in a buffered input stream so that we can // avoid overhead excessive of JNI call while trying to uncompress small amount of data. diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 0c0259f12b..a0a4428dc7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -126,7 +126,8 @@ private[spark] class EventLoggingListener( } try { - val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream) + val cstream = compressionCodec.map(_.compressedContinuousOutputStream(dstream)) + .getOrElse(dstream) val bstream = new BufferedOutputStream(cstream, outputBufferSize) EventLoggingListener.initEventLog(bstream, testing, loggedEvents) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 9a2198e510..1d465ba373 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1254,7 +1254,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { private def writeFile(file: File, codec: Option[CompressionCodec], events: SparkListenerEvent*) = { val fstream = new FileOutputStream(file) - val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) + val cstream = codec.map(_.compressedContinuousOutputStream(fstream)).getOrElse(fstream) val bstream = new BufferedOutputStream(cstream) EventLoggingListener.initEventLog(bstream, false, null) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 080b4afb55..55e7f5333c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -89,7 +89,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp test("Replay compressed inprogress log file succeeding on partial read") { val buffered = new ByteArrayOutputStream val codec = new LZ4CompressionCodec(new SparkConf()) - val compstream = codec.compressedOutputStream(buffered) + val compstream = codec.compressedContinuousOutputStream(buffered) val cwriter = new OutputStreamWriter(compstream, StandardCharsets.UTF_8) Utils.tryWithResource(new PrintWriter(cwriter)) { writer =>