[SPARK-19112][CORE] Support for ZStandard codec

## What changes were proposed in this pull request?

Using zstd compression for Spark jobs spilling 100s of TBs of data, we could reduce the amount of data written to disk by as much as 50%. This translates to significant latency gain because of reduced disk io operations. There is a degradation CPU time by 2 - 5% because of zstd compression overhead, but for jobs which are bottlenecked by disk IO, this hit can be taken.

## Benchmark
Please note that this benchmark is using real world compute heavy production workload spilling TBs of data to disk

|         | zstd performance as compred to LZ4   |
| ------------- | -----:|
| spill/shuffle bytes    | -48% |
| cpu time    |    + 3% |
| cpu reservation time       |    -40%|
| latency     |     -40% |

## How was this patch tested?

Tested by running few jobs spilling large amount of data on the cluster and amount of intermediate data written to disk reduced by as much as 50%.

Author: Sital Kedia <skedia@fb.com>

Closes #18805 from sitalkedia/skedia/upstream_zstd.
This commit is contained in:
Sital Kedia 2017-11-01 14:54:08 +01:00 committed by Herman van Hovell
parent 07f390a27d
commit 444bce1c98
10 changed files with 140 additions and 3 deletions

View file

@ -269,6 +269,8 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE) (BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
(BSD 3 Clause) DPark (https://github.com/douban/dpark/blob/master/LICENSE) (BSD 3 Clause) DPark (https://github.com/douban/dpark/blob/master/LICENSE)
(BSD 3 Clause) CloudPickle (https://github.com/cloudpipe/cloudpickle/blob/master/LICENSE) (BSD 3 Clause) CloudPickle (https://github.com/cloudpipe/cloudpickle/blob/master/LICENSE)
(BSD 2 Clause) Zstd-jni (https://github.com/luben/zstd-jni/blob/master/LICENSE)
(BSD license) Zstd (https://github.com/facebook/zstd/blob/v1.3.1/LICENSE)
======================================================================== ========================================================================
MIT licenses MIT licenses

View file

@ -198,6 +198,10 @@
<groupId>org.lz4</groupId> <groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId> <artifactId>lz4-java</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.roaringbitmap</groupId> <groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId> <artifactId>RoaringBitmap</artifactId>

View file

@ -20,6 +20,7 @@ package org.apache.spark.io
import java.io._ import java.io._
import java.util.Locale import java.util.Locale
import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
@ -50,13 +51,14 @@ private[spark] object CompressionCodec {
private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = { private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
(codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec] (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
|| codec.isInstanceOf[LZ4CompressionCodec]) || codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStdCompressionCodec])
} }
private val shortCompressionCodecNames = Map( private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName, "lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName, "lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName) "snappy" -> classOf[SnappyCompressionCodec].getName,
"zstd" -> classOf[ZStdCompressionCodec].getName)
def getCodecName(conf: SparkConf): String = { def getCodecName(conf: SparkConf): String = {
conf.get(configKey, DEFAULT_COMPRESSION_CODEC) conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
@ -219,3 +221,33 @@ private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends Ou
} }
} }
} }
/**
* :: DeveloperApi ::
* ZStandard implementation of [[org.apache.spark.io.CompressionCodec]]. For more
* details see - http://facebook.github.io/zstd/
*
* @note The wire protocol for this codec is not guaranteed to be compatible across versions
* of Spark. This is intended for use as an internal compression utility within a single Spark
* application.
*/
@DeveloperApi
class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {
private val bufferSize = conf.getSizeAsBytes("spark.io.compression.zstd.bufferSize", "32k").toInt
// Default compression level for zstd compression to 1 because it is
// fastest of all with reasonably high compression ratio.
private val level = conf.getInt("spark.io.compression.zstd.level", 1)
override def compressedOutputStream(s: OutputStream): OutputStream = {
// Wrap the zstd output stream in a buffered output stream, so that we can
// avoid overhead excessive of JNI call while trying to compress small amount of data.
new BufferedOutputStream(new ZstdOutputStream(s, level), bufferSize)
}
override def compressedInputStream(s: InputStream): InputStream = {
// Wrap the zstd input stream in a buffered input stream so that we can
// avoid overhead excessive of JNI call while trying to uncompress small amount of data.
new BufferedInputStream(new ZstdInputStream(s), bufferSize)
}
}

View file

@ -104,6 +104,24 @@ class CompressionCodecSuite extends SparkFunSuite {
testConcatenationOfSerializedStreams(codec) testConcatenationOfSerializedStreams(codec)
} }
test("zstd compression codec") {
val codec = CompressionCodec.createCodec(conf, classOf[ZStdCompressionCodec].getName)
assert(codec.getClass === classOf[ZStdCompressionCodec])
testCodec(codec)
}
test("zstd compression codec short form") {
val codec = CompressionCodec.createCodec(conf, "zstd")
assert(codec.getClass === classOf[ZStdCompressionCodec])
testCodec(codec)
}
test("zstd supports concatenation of serialized zstd") {
val codec = CompressionCodec.createCodec(conf, classOf[ZStdCompressionCodec].getName)
assert(codec.getClass === classOf[ZStdCompressionCodec])
testConcatenationOfSerializedStreams(codec)
}
test("bad compression codec") { test("bad compression codec") {
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
CompressionCodec.createCodec(conf, "foobar") CompressionCodec.createCodec(conf, "foobar")

View file

@ -189,3 +189,4 @@ xercesImpl-2.9.1.jar
xmlenc-0.52.jar xmlenc-0.52.jar
xz-1.0.jar xz-1.0.jar
zookeeper-3.4.6.jar zookeeper-3.4.6.jar
zstd-jni-1.3.2-2.jar

View file

@ -190,3 +190,4 @@ xercesImpl-2.9.1.jar
xmlenc-0.52.jar xmlenc-0.52.jar
xz-1.0.jar xz-1.0.jar
zookeeper-3.4.6.jar zookeeper-3.4.6.jar
zstd-jni-1.3.2-2.jar

View file

@ -889,7 +889,8 @@ Apart from these, the following properties are also available, and may be useful
e.g. e.g.
<code>org.apache.spark.io.LZ4CompressionCodec</code>, <code>org.apache.spark.io.LZ4CompressionCodec</code>,
<code>org.apache.spark.io.LZFCompressionCodec</code>, <code>org.apache.spark.io.LZFCompressionCodec</code>,
and <code>org.apache.spark.io.SnappyCompressionCodec</code>. <code>org.apache.spark.io.SnappyCompressionCodec</code>,
and <code>org.apache.spark.io.ZstdCompressionCodec</code>.
</td> </td>
</tr> </tr>
<tr> <tr>
@ -908,6 +909,23 @@ Apart from these, the following properties are also available, and may be useful
is used. Lowering this block size will also lower shuffle memory usage when Snappy is used. is used. Lowering this block size will also lower shuffle memory usage when Snappy is used.
</td> </td>
</tr> </tr>
<tr>
<td><code>spark.io.compression.zstd.level</code></td>
<td>1</td>
<td>
Compression level for Zstd compression codec. Increasing the compression level will result in better
compression at the expense of more CPU and memory.
</td>
</tr>
<tr>
<td><code>spark.io.compression.zstd.bufferSize</code></td>
<td>32k</td>
<td>
Buffer size used in Zstd compression, in the case when Zstd compression codec
is used. Lowering this size will lower the shuffle memory usage when Zstd is used, but it
might increase the compression cost because of excessive JNI call overhead.
</td>
</tr>
<tr> <tr>
<td><code>spark.kryo.classesToRegister</code></td> <td><code>spark.kryo.classesToRegister</code></td>
<td>(none)</td> <td>(none)</td>

View file

@ -0,0 +1,26 @@
Zstd-jni: JNI bindings to Zstd Library
Copyright (c) 2015-2016, Luben Karavelov/ All rights reserved.
BSD License
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

30
licenses/LICENSE-zstd.txt Normal file
View file

@ -0,0 +1,30 @@
BSD License
For Zstandard software
Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name Facebook nor the names of its contributors may be used to
endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -537,6 +537,11 @@
<artifactId>lz4-java</artifactId> <artifactId>lz4-java</artifactId>
<version>1.4.0</version> <version>1.4.0</version>
</dependency> </dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.3.2-2</version>
</dependency>
<dependency> <dependency>
<groupId>com.clearspring.analytics</groupId> <groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId> <artifactId>stream</artifactId>