[SPARK-34647][CORE] Use ZSTD JNI NoFinalizer classes and bump to 1.4.8-7

### What changes were proposed in this pull request?

This PR aims to use `ZstdInputStreamNoFinalizer` and `ZstdOutputStreamNoFinalizer` classes and upgrade ZSTD JNI to 1.4.8-7.

### Why are the changes needed?

`1.4.8-7` makes `NoFinalizer` classes public again. This improves the performance.
- 57d53a09d2

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #31762 from dongjoon-hyun/SPARK-ZSTD-NOFINALIZER.

Lead-authored-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-03-06 10:32:27 -08:00
parent 616c818e7c
commit 1f6089b165
6 changed files with 34 additions and 32 deletions

View file

@ -6,22 +6,22 @@ OpenJDK 64-Bit Server VM 11.0.10+9-Ubuntu-0ubuntu1.18.04 on Linux 4.15.0-1044-aw
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------------
Compression 10000 times at level 1 without buffer pool 1165 1255 128 0.0 116483.1 1.0X
Compression 10000 times at level 2 without buffer pool 932 981 42 0.0 93225.9 1.2X
Compression 10000 times at level 3 without buffer pool 1176 1216 55 0.0 117634.8 1.0X
Compression 10000 times at level 1 with buffer pool 415 424 5 0.0 41470.4 2.8X
Compression 10000 times at level 2 with buffer pool 493 503 7 0.0 49295.1 2.4X
Compression 10000 times at level 3 with buffer pool 754 759 6 0.0 75403.9 1.5X
Compression 10000 times at level 1 without buffer pool 800 801 1 0.0 80017.0 1.0X
Compression 10000 times at level 2 without buffer pool 703 704 1 0.0 70331.6 1.1X
Compression 10000 times at level 3 without buffer pool 935 938 5 0.0 93488.6 0.9X
Compression 10000 times at level 1 with buffer pool 396 398 2 0.0 39645.3 2.0X
Compression 10000 times at level 2 with buffer pool 477 477 0 0.0 47726.1 1.7X
Compression 10000 times at level 3 with buffer pool 699 700 1 0.0 69906.8 1.1X
OpenJDK 64-Bit Server VM 11.0.10+9-Ubuntu-0ubuntu1.18.04 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------------------------
Decompression 10000 times from level 1 without buffer pool 1102 1103 2 0.0 110154.5 1.0X
Decompression 10000 times from level 2 without buffer pool 1019 1057 54 0.0 101865.8 1.1X
Decompression 10000 times from level 3 without buffer pool 1020 1067 65 0.0 102046.4 1.1X
Decompression 10000 times from level 1 with buffer pool 590 596 4 0.0 58957.2 1.9X
Decompression 10000 times from level 2 with buffer pool 590 594 4 0.0 58979.5 1.9X
Decompression 10000 times from level 3 with buffer pool 592 598 5 0.0 59164.3 1.9X
Decompression 10000 times from level 1 without buffer pool 784 785 1 0.0 78383.2 1.0X
Decompression 10000 times from level 2 without buffer pool 785 785 0 0.0 78467.7 1.0X
Decompression 10000 times from level 3 without buffer pool 785 785 0 0.0 78471.3 1.0X
Decompression 10000 times from level 1 with buffer pool 573 574 0 0.0 57325.7 1.4X
Decompression 10000 times from level 2 with buffer pool 575 575 0 0.0 57505.9 1.4X
Decompression 10000 times from level 3 with buffer pool 575 576 1 0.0 57531.1 1.4X

View file

@ -6,22 +6,22 @@ OpenJDK 64-Bit Server VM 1.8.0_282-8u282-b08-0ubuntu1~18.04-b08 on Linux 4.15.0-
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------------
Compression 10000 times at level 1 without buffer pool 1382 1418 50 0.0 138213.9 1.0X
Compression 10000 times at level 2 without buffer pool 968 1012 63 0.0 96784.1 1.4X
Compression 10000 times at level 3 without buffer pool 1203 1247 63 0.0 120266.3 1.1X
Compression 10000 times at level 1 with buffer pool 431 439 5 0.0 43117.4 3.2X
Compression 10000 times at level 2 with buffer pool 515 522 5 0.0 51484.4 2.7X
Compression 10000 times at level 3 with buffer pool 753 759 6 0.0 75321.5 1.8X
Compression 10000 times at level 1 without buffer pool 830 855 21 0.0 83027.5 1.0X
Compression 10000 times at level 2 without buffer pool 698 699 1 0.0 69824.4 1.2X
Compression 10000 times at level 3 without buffer pool 929 930 1 0.0 92897.5 0.9X
Compression 10000 times at level 1 with buffer pool 403 404 1 0.0 40329.7 2.1X
Compression 10000 times at level 2 with buffer pool 489 490 1 0.0 48921.1 1.7X
Compression 10000 times at level 3 with buffer pool 714 716 3 0.0 71353.6 1.2X
OpenJDK 64-Bit Server VM 1.8.0_282-8u282-b08-0ubuntu1~18.04-b08 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------------------------
Decompression 10000 times from level 1 without buffer pool 1037 1074 52 0.0 103699.4 1.0X
Decompression 10000 times from level 2 without buffer pool 1040 1077 52 0.0 104009.1 1.0X
Decompression 10000 times from level 3 without buffer pool 1041 1083 60 0.0 104099.2 1.0X
Decompression 10000 times from level 1 with buffer pool 597 605 5 0.0 59720.1 1.7X
Decompression 10000 times from level 2 with buffer pool 603 607 3 0.0 60275.4 1.7X
Decompression 10000 times from level 3 with buffer pool 596 603 5 0.0 59602.2 1.7X
Decompression 10000 times from level 1 without buffer pool 790 791 1 0.0 79013.4 1.0X
Decompression 10000 times from level 2 without buffer pool 788 789 1 0.0 78787.6 1.0X
Decompression 10000 times from level 3 without buffer pool 787 787 1 0.0 78655.3 1.0X
Decompression 10000 times from level 1 with buffer pool 573 574 1 0.0 57279.4 1.4X
Decompression 10000 times from level 2 with buffer pool 582 582 1 0.0 58187.6 1.4X
Decompression 10000 times from level 3 with buffer pool 583 584 1 0.0 58330.5 1.4X

View file

@ -20,7 +20,7 @@ package org.apache.spark.io
import java.io._
import java.util.Locale
import com.github.luben.zstd.{NoPool, RecyclingBufferPool, ZstdInputStream, ZstdOutputStream}
import com.github.luben.zstd.{NoPool, RecyclingBufferPool, ZstdInputStreamNoFinalizer, ZstdOutputStreamNoFinalizer}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
import net.jpountz.xxhash.XXHashFactory
@ -226,21 +226,22 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
// Wrap the zstd output stream in a buffered output stream, so that we can
// avoid overhead excessive of JNI call while trying to compress small amount of data.
val os = new ZstdOutputStream(s, bufferPool).setLevel(level)
val os = new ZstdOutputStreamNoFinalizer(s, bufferPool).setLevel(level)
new BufferedOutputStream(os, bufferSize)
}
override private[spark] def compressedContinuousOutputStream(s: OutputStream) = {
// SPARK-29322: Set "closeFrameOnFlush" to 'true' to let continuous input stream not being
// stuck on reading open frame.
val os = new ZstdOutputStream(s, bufferPool).setLevel(level).setCloseFrameOnFlush(true)
val os = new ZstdOutputStreamNoFinalizer(s, bufferPool)
.setLevel(level).setCloseFrameOnFlush(true)
new BufferedOutputStream(os, bufferSize)
}
override def compressedInputStream(s: InputStream): InputStream = {
// Wrap the zstd input stream in a buffered input stream so that we can
// avoid overhead excessive of JNI call while trying to uncompress small amount of data.
new BufferedInputStream(new ZstdInputStream(s, bufferPool), bufferSize)
new BufferedInputStream(new ZstdInputStreamNoFinalizer(s, bufferPool), bufferSize)
}
override def compressedContinuousInputStream(s: InputStream): InputStream = {
@ -248,6 +249,7 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {
// Reading). By default `isContinuous` is false, and when we try to read from open frames,
// `compressedInputStream` method above throws truncated error exception. This method set
// `isContinuous` true to allow reading from open frames.
new BufferedInputStream(new ZstdInputStream(s, bufferPool).setContinuous(true), bufferSize)
new BufferedInputStream(
new ZstdInputStreamNoFinalizer(s, bufferPool).setContinuous(true), bufferSize)
}
}

View file

@ -243,4 +243,4 @@ xz/1.8//xz-1.8.jar
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
zookeeper-jute/3.6.2//zookeeper-jute-3.6.2.jar
zookeeper/3.6.2//zookeeper-3.6.2.jar
zstd-jni/1.4.8-6//zstd-jni-1.4.8-6.jar
zstd-jni/1.4.8-7//zstd-jni-1.4.8-7.jar

View file

@ -211,4 +211,4 @@ xz/1.8//xz-1.8.jar
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
zookeeper-jute/3.6.2//zookeeper-jute-3.6.2.jar
zookeeper/3.6.2//zookeeper-3.6.2.jar
zstd-jni/1.4.8-6//zstd-jni-1.4.8-6.jar
zstd-jni/1.4.8-7//zstd-jni-1.4.8-7.jar

View file

@ -705,7 +705,7 @@
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.4.8-6</version>
<version>1.4.8-7</version>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>