[SPARK-34503][CORE] Use zstd for spark.eventLog.compression.codec by default

### What changes were proposed in this pull request?

Apache Spark 3.0 introduced `spark.eventLog.compression.codec` configuration.
For Apache Spark 3.2, this PR aims to set `zstd` as the default value for `spark.eventLog.compression.codec` configuration.
This only affects creating a new log file.

### Why are the changes needed?

The main purpose of event logs is archiving. Many logs are generated and occupy the storage, but most of them are never accessed by users.

**1. Save storage resources (and money)**

In general, ZSTD is much smaller than LZ4.
For example, in case of TPCDS (Scale 200) log, ZSTD generates about 3 times smaller log files than LZ4.

| CODEC | SIZE (bytes) |
|---------|-------------|
| LZ4         | 184001434|
| ZSTD      |  64522396|

And, the plain file is 17.6 times bigger.
```
-rw-r--r--    1 dongjoon  staff  1135464691 Feb 21 22:31 spark-a1843ead29834f46b1125a03eca32679
-rw-r--r--    1 dongjoon  staff    64522396 Feb 21 22:31 spark-a1843ead29834f46b1125a03eca32679.zstd
```

**2. Better Usability**

We cannot decompress Spark-generated LZ4 event log files via CLI while we can for ZSTD event log files. Spark's LZ4 event log files are inconvenient to some users who want to uncompress and access them.
```
$ lz4 -d spark-d3deba027bd34435ba849e14fc2c42ef.lz4
Decoding file spark-d3deba027bd34435ba849e14fc2c42ef
Error 44 : Unrecognized header : file cannot be decoded
```
```
$ zstd -d spark-a1843ead29834f46b1125a03eca32679.zstd
spark-a1843ead29834f46b1125a03eca32679.zstd: 1135464691 bytes
```

**3. Speed**
The following results are collected by running [lzbench](https://github.com/inikep/lzbench) on the above Spark event log. Note that
- This is not a direct comparison of Spark compression/decompression codec.
- `lzbench` is an in-memory benchmark. So, it doesn't show the benefit of the reduced network traffic due to the small size of ZSTD.

Here,
- To get ZSTD 1.4.8-1 result, `lzbench` `master` branch is used because Spark is using ZSTD 1.4.8.
- To get LZ4 1.7.5 result, `lzbench` `v1.7` branch is used because Spark is using LZ4 1.7.1.
```
Compressor name      Compress. Decompress. Compr. size  Ratio Filename
memcpy               7393 MB/s  7166 MB/s  1135464691 100.00 spark-a1843ead29834f46b1125a03eca32679
zstd 1.4.8 -1        1344 MB/s  3351 MB/s    56665767   4.99 spark-a1843ead29834f46b1125a03eca32679
lz4 1.7.5            1385 MB/s  4782 MB/s   127662168  11.24 spark-a1843ead29834f46b1125a03eca32679
```

### Does this PR introduce _any_ user-facing change?

- No for the apps which doesn't use `spark.eventLog.compress` because `spark.eventLog.compress` is disabled by default.
- No for the apps using `spark.eventLog.compression.codec` explicitly because this is a change of the default value.
- Yes for the apps using `spark.eventLog.compress` without setting `spark.eventLog.compression.codec`. In this case, previously `spark.io.compression.codec` value was used whose default is `lz4`.

So this JIRA issue, SPARK-34503, is labeled with `releasenotes`.

### How was this patch tested?

Pass the updated UT.

Closes #31618 from dongjoon-hyun/SPARK-34503.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-02-23 16:37:29 -08:00
parent 7f27d33a3c
commit 2e31e2c5f3
4 changed files with 9 additions and 13 deletions

View file

@ -1726,9 +1726,10 @@ package object config {
ConfigBuilder("spark.eventLog.compression.codec")
.doc("The codec used to compress event log. By default, Spark provides four codecs: " +
"lz4, lzf, snappy, and zstd. You can also use fully qualified class names to specify " +
"the codec. If this is not given, spark.io.compression.codec will be used.")
"the codec.")
.version("3.0.0")
.fallbackConf(IO_COMPRESSION_CODEC)
.stringConf
.createWithDefault("zstd")
private[spark] val BUFFER_SIZE =
ConfigBuilder("spark.buffer.size")

View file

@ -99,7 +99,7 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon
}
}
test("spark.eventLog.compression.codec overrides spark.io.compression.codec") {
test("Use the defalut value of spark.eventLog.compression.codec") {
val conf = new SparkConf
conf.set(EVENT_LOG_COMPRESS, true)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
@ -107,14 +107,8 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon
val appId = "test"
val appAttemptId = None
// The default value is `spark.io.compression.codec`.
val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf)
assert(writer.compressionCodecName.contains("lz4"))
// `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`.
conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd")
val writer2 = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf)
assert(writer2.compressionCodecName.contains("zstd"))
assert(writer.compressionCodecName === EVENT_LOG_COMPRESSION_CODEC.defaultValue)
}
protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): List[String] = {

View file

@ -1040,10 +1040,9 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.eventLog.compression.codec</code></td>
<td></td>
<td>zstd</td>
<td>
The codec to compress logged events. If this is not given,
<code>spark.io.compression.codec</code> will be used.
The codec to compress logged events.
</td>
<td>3.0.0</td>
</tr>

View file

@ -24,6 +24,8 @@ license: |
## Upgrading from Core 3.1 to 3.2
- Since Spark 3.2, `spark.eventLog.compression.codec` is set to `zstd` by default which means Spark will not fallback to use `spark.io.compression.codec` anymore.
- Since Spark 3.2, `spark.storage.replication.proactive` is enabled by default which means Spark tries to replenish in case of the loss of cached RDD block replicas due to executor failures. To restore the behavior before Spark 3.2, you can set `spark.storage.replication.proactive` to `false`.
## Upgrading from Core 3.0 to 3.1