From 444bce1c98c45147fe63e2132e9743a0c5e49598 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Wed, 1 Nov 2017 14:54:08 +0100 Subject: [PATCH] [SPARK-19112][CORE] Support for ZStandard codec ## What changes were proposed in this pull request? Using zstd compression for Spark jobs spilling 100s of TBs of data, we could reduce the amount of data written to disk by as much as 50%. This translates to significant latency gain because of reduced disk io operations. There is a degradation CPU time by 2 - 5% because of zstd compression overhead, but for jobs which are bottlenecked by disk IO, this hit can be taken. ## Benchmark Please note that this benchmark is using real world compute heavy production workload spilling TBs of data to disk | | zstd performance as compred to LZ4 | | ------------- | -----:| | spill/shuffle bytes | -48% | | cpu time | + 3% | | cpu reservation time | -40%| | latency | -40% | ## How was this patch tested? Tested by running few jobs spilling large amount of data on the cluster and amount of intermediate data written to disk reduced by as much as 50%. Author: Sital Kedia Closes #18805 from sitalkedia/skedia/upstream_zstd. --- LICENSE | 2 ++ core/pom.xml | 4 +++ .../apache/spark/io/CompressionCodec.scala | 36 +++++++++++++++++-- .../spark/io/CompressionCodecSuite.scala | 18 ++++++++++ dev/deps/spark-deps-hadoop-2.6 | 1 + dev/deps/spark-deps-hadoop-2.7 | 1 + docs/configuration.md | 20 ++++++++++- licenses/LICENSE-zstd-jni.txt | 26 ++++++++++++++ licenses/LICENSE-zstd.txt | 30 ++++++++++++++++ pom.xml | 5 +++ 10 files changed, 140 insertions(+), 3 deletions(-) create mode 100644 licenses/LICENSE-zstd-jni.txt create mode 100644 licenses/LICENSE-zstd.txt diff --git a/LICENSE b/LICENSE index 39fe0dc462..c2b0d72663 100644 --- a/LICENSE +++ b/LICENSE @@ -269,6 +269,8 @@ The text of each license is also included at licenses/LICENSE-[project].txt. (BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE) (BSD 3 Clause) DPark (https://github.com/douban/dpark/blob/master/LICENSE) (BSD 3 Clause) CloudPickle (https://github.com/cloudpipe/cloudpickle/blob/master/LICENSE) + (BSD 2 Clause) Zstd-jni (https://github.com/luben/zstd-jni/blob/master/LICENSE) + (BSD license) Zstd (https://github.com/facebook/zstd/blob/v1.3.1/LICENSE) ======================================================================== MIT licenses diff --git a/core/pom.xml b/core/pom.xml index 54f7a34a6c..fa138d3e7a 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -198,6 +198,10 @@ org.lz4 lz4-java + + com.github.luben + zstd-jni + org.roaringbitmap RoaringBitmap diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 27f2e42939..7722db56ee 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -20,6 +20,7 @@ package org.apache.spark.io import java.io._ import java.util.Locale +import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} @@ -50,13 +51,14 @@ private[spark] object CompressionCodec { private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = { (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec] - || codec.isInstanceOf[LZ4CompressionCodec]) + || codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStdCompressionCodec]) } private val shortCompressionCodecNames = Map( "lz4" -> classOf[LZ4CompressionCodec].getName, "lzf" -> classOf[LZFCompressionCodec].getName, - "snappy" -> classOf[SnappyCompressionCodec].getName) + "snappy" -> classOf[SnappyCompressionCodec].getName, + "zstd" -> classOf[ZStdCompressionCodec].getName) def getCodecName(conf: SparkConf): String = { conf.get(configKey, DEFAULT_COMPRESSION_CODEC) @@ -219,3 +221,33 @@ private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends Ou } } } + +/** + * :: DeveloperApi :: + * ZStandard implementation of [[org.apache.spark.io.CompressionCodec]]. For more + * details see - http://facebook.github.io/zstd/ + * + * @note The wire protocol for this codec is not guaranteed to be compatible across versions + * of Spark. This is intended for use as an internal compression utility within a single Spark + * application. + */ +@DeveloperApi +class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { + + private val bufferSize = conf.getSizeAsBytes("spark.io.compression.zstd.bufferSize", "32k").toInt + // Default compression level for zstd compression to 1 because it is + // fastest of all with reasonably high compression ratio. + private val level = conf.getInt("spark.io.compression.zstd.level", 1) + + override def compressedOutputStream(s: OutputStream): OutputStream = { + // Wrap the zstd output stream in a buffered output stream, so that we can + // avoid overhead excessive of JNI call while trying to compress small amount of data. + new BufferedOutputStream(new ZstdOutputStream(s, level), bufferSize) + } + + override def compressedInputStream(s: InputStream): InputStream = { + // Wrap the zstd input stream in a buffered input stream so that we can + // avoid overhead excessive of JNI call while trying to uncompress small amount of data. + new BufferedInputStream(new ZstdInputStream(s), bufferSize) + } +} diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 9e9c2b0165..7b40e3e582 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -104,6 +104,24 @@ class CompressionCodecSuite extends SparkFunSuite { testConcatenationOfSerializedStreams(codec) } + test("zstd compression codec") { + val codec = CompressionCodec.createCodec(conf, classOf[ZStdCompressionCodec].getName) + assert(codec.getClass === classOf[ZStdCompressionCodec]) + testCodec(codec) + } + + test("zstd compression codec short form") { + val codec = CompressionCodec.createCodec(conf, "zstd") + assert(codec.getClass === classOf[ZStdCompressionCodec]) + testCodec(codec) + } + + test("zstd supports concatenation of serialized zstd") { + val codec = CompressionCodec.createCodec(conf, classOf[ZStdCompressionCodec].getName) + assert(codec.getClass === classOf[ZStdCompressionCodec]) + testConcatenationOfSerializedStreams(codec) + } + test("bad compression codec") { intercept[IllegalArgumentException] { CompressionCodec.createCodec(conf, "foobar") diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 6e2fc63d67..21c8a75796 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -189,3 +189,4 @@ xercesImpl-2.9.1.jar xmlenc-0.52.jar xz-1.0.jar zookeeper-3.4.6.jar +zstd-jni-1.3.2-2.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index c2bbc253d7..7173426c7b 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -190,3 +190,4 @@ xercesImpl-2.9.1.jar xmlenc-0.52.jar xz-1.0.jar zookeeper-3.4.6.jar +zstd-jni-1.3.2-2.jar diff --git a/docs/configuration.md b/docs/configuration.md index d3c358bb74..9b9583d916 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -889,7 +889,8 @@ Apart from these, the following properties are also available, and may be useful e.g. org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, - and org.apache.spark.io.SnappyCompressionCodec. + org.apache.spark.io.SnappyCompressionCodec, + and org.apache.spark.io.ZstdCompressionCodec. @@ -908,6 +909,23 @@ Apart from these, the following properties are also available, and may be useful is used. Lowering this block size will also lower shuffle memory usage when Snappy is used. + + spark.io.compression.zstd.level + 1 + + Compression level for Zstd compression codec. Increasing the compression level will result in better + compression at the expense of more CPU and memory. + + + + spark.io.compression.zstd.bufferSize + 32k + + Buffer size used in Zstd compression, in the case when Zstd compression codec + is used. Lowering this size will lower the shuffle memory usage when Zstd is used, but it + might increase the compression cost because of excessive JNI call overhead. + + spark.kryo.classesToRegister (none) diff --git a/licenses/LICENSE-zstd-jni.txt b/licenses/LICENSE-zstd-jni.txt new file mode 100644 index 0000000000..32c6bbdd98 --- /dev/null +++ b/licenses/LICENSE-zstd-jni.txt @@ -0,0 +1,26 @@ +Zstd-jni: JNI bindings to Zstd Library + +Copyright (c) 2015-2016, Luben Karavelov/ All rights reserved. + +BSD License + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, this + list of conditions and the following disclaimer in the documentation and/or + other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/licenses/LICENSE-zstd.txt b/licenses/LICENSE-zstd.txt new file mode 100644 index 0000000000..a793a80289 --- /dev/null +++ b/licenses/LICENSE-zstd.txt @@ -0,0 +1,30 @@ +BSD License + +For Zstandard software + +Copyright (c) 2016-present, Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook nor the names of its contributors may be used to + endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/pom.xml b/pom.xml index 2d59f06811..652aed4d12 100644 --- a/pom.xml +++ b/pom.xml @@ -537,6 +537,11 @@ lz4-java 1.4.0 + + com.github.luben + zstd-jni + 1.3.2-2 + com.clearspring.analytics stream