[SPARK-34340][CORE] Support ZSTD JNI BufferPool

### What changes were proposed in this pull request?

This PR aims two goals.
1. Support ZSTD JNI BufferPool feature by adding a new configuration, `spark.io.compression.zstd.bufferPool.enabled`, for Apache Spark 3.2.0.
2. Make Spark independent from ZSTD JNI library's default buffer pool policy change.

### Why are the changes needed?

ZSTD JNI library has different behaviors across its versions.

| Version | Description | Commit |
| ---------- | --------------- | ----------- |
| v1.4.5-7 | `BufferPool` was added and used it by default | 4f55c89172 |
| v1.4.5-8 | `RecyclingBufferPool` was added and `BufferPool` became an interface to allow custom BufferPool implementation | dd2588edd3 |
| v1.4.7+ | `NoPool` is used by default and user should specify buffer pool explicitly | f7c8279bc1 |

### Does this PR introduce _any_ user-facing change?

No, the default value (`false`) is consistent with the AS-IS ZSTD-JNI library's default buffer pool.

### How was this patch tested?

Pass the CIs with the updated UT.

Closes #31453 from dongjoon-hyun/SPARK-34340.

Lead-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-02-03 09:11:20 -08:00 committed by Dongjoon Hyun
parent 3d7e1397d6
commit 8e28218106
3 changed files with 27 additions and 8 deletions

View file

@ -1680,6 +1680,13 @@ package object config {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("32k")
private[spark] val IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED =
ConfigBuilder("spark.io.compression.zstd.bufferPool.enabled")
.doc("If true, enable buffer pool of ZSTD JNI library.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)
private[spark] val IO_COMPRESSION_ZSTD_LEVEL =
ConfigBuilder("spark.io.compression.zstd.level")
.doc("Compression level for Zstd compression codec. Increasing the compression " +

View file

@ -20,7 +20,7 @@ package org.apache.spark.io
import java.io._
import java.util.Locale
import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
import com.github.luben.zstd.{NoPool, RecyclingBufferPool, ZstdInputStream, ZstdOutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
import net.jpountz.xxhash.XXHashFactory
@ -217,22 +217,30 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {
// fastest of all with reasonably high compression ratio.
private val level = conf.get(IO_COMPRESSION_ZSTD_LEVEL)
private val bufferPool = if (conf.get(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED)) {
RecyclingBufferPool.INSTANCE
} else {
NoPool.INSTANCE
}
override def compressedOutputStream(s: OutputStream): OutputStream = {
// Wrap the zstd output stream in a buffered output stream, so that we can
// avoid overhead excessive of JNI call while trying to compress small amount of data.
new BufferedOutputStream(new ZstdOutputStream(s, level), bufferSize)
val os = new ZstdOutputStream(s, bufferPool).setLevel(level)
new BufferedOutputStream(os, bufferSize)
}
override private[spark] def compressedContinuousOutputStream(s: OutputStream) = {
// SPARK-29322: Set "closeFrameOnFlush" to 'true' to let continuous input stream not being
// stuck on reading open frame.
new BufferedOutputStream(new ZstdOutputStream(s, level).setCloseFrameOnFlush(true), bufferSize)
val os = new ZstdOutputStream(s, bufferPool).setLevel(level).setCloseFrameOnFlush(true)
new BufferedOutputStream(os, bufferSize)
}
override def compressedInputStream(s: InputStream): InputStream = {
// Wrap the zstd input stream in a buffered input stream so that we can
// avoid overhead excessive of JNI call while trying to uncompress small amount of data.
new BufferedInputStream(new ZstdInputStream(s), bufferSize)
new BufferedInputStream(new ZstdInputStream(s, bufferPool), bufferSize)
}
override def compressedContinuousInputStream(s: InputStream): InputStream = {
@ -240,6 +248,6 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {
// Reading). By default `isContinuous` is false, and when we try to read from open frames,
// `compressedInputStream` method above throws truncated error exception. This method set
// `isContinuous` true to allow reading from open frames.
new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), bufferSize)
new BufferedInputStream(new ZstdInputStream(s, bufferPool).setContinuous(true), bufferSize)
}
}

View file

@ -22,6 +22,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.google.common.io.ByteStreams
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED
class CompressionCodecSuite extends SparkFunSuite {
val conf = new SparkConf(false)
@ -105,9 +106,12 @@ class CompressionCodecSuite extends SparkFunSuite {
}
test("zstd compression codec") {
val codec = CompressionCodec.createCodec(conf, classOf[ZStdCompressionCodec].getName)
assert(codec.getClass === classOf[ZStdCompressionCodec])
testCodec(codec)
Seq("true", "false").foreach { flag =>
val conf = new SparkConf(false).set(IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED.key, flag)
val codec = CompressionCodec.createCodec(conf, classOf[ZStdCompressionCodec].getName)
assert(codec.getClass === classOf[ZStdCompressionCodec])
testCodec(codec)
}
}
test("zstd compression codec short form") {