From 2e31e2c5f30742c312767f26b17396c4ecfbef72 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 23 Feb 2021 16:37:29 -0800 Subject: [PATCH] [SPARK-34503][CORE] Use zstd for spark.eventLog.compression.codec by default ### What changes were proposed in this pull request? Apache Spark 3.0 introduced `spark.eventLog.compression.codec` configuration. For Apache Spark 3.2, this PR aims to set `zstd` as the default value for `spark.eventLog.compression.codec` configuration. This only affects creating a new log file. ### Why are the changes needed? The main purpose of event logs is archiving. Many logs are generated and occupy the storage, but most of them are never accessed by users. **1. Save storage resources (and money)** In general, ZSTD is much smaller than LZ4. For example, in case of TPCDS (Scale 200) log, ZSTD generates about 3 times smaller log files than LZ4. | CODEC | SIZE (bytes) | |---------|-------------| | LZ4 | 184001434| | ZSTD | 64522396| And, the plain file is 17.6 times bigger. ``` -rw-r--r-- 1 dongjoon staff 1135464691 Feb 21 22:31 spark-a1843ead29834f46b1125a03eca32679 -rw-r--r-- 1 dongjoon staff 64522396 Feb 21 22:31 spark-a1843ead29834f46b1125a03eca32679.zstd ``` **2. Better Usability** We cannot decompress Spark-generated LZ4 event log files via CLI while we can for ZSTD event log files. Spark's LZ4 event log files are inconvenient to some users who want to uncompress and access them. ``` $ lz4 -d spark-d3deba027bd34435ba849e14fc2c42ef.lz4 Decoding file spark-d3deba027bd34435ba849e14fc2c42ef Error 44 : Unrecognized header : file cannot be decoded ``` ``` $ zstd -d spark-a1843ead29834f46b1125a03eca32679.zstd spark-a1843ead29834f46b1125a03eca32679.zstd: 1135464691 bytes ``` **3. Speed** The following results are collected by running [lzbench](https://github.com/inikep/lzbench) on the above Spark event log. Note that - This is not a direct comparison of Spark compression/decompression codec. - `lzbench` is an in-memory benchmark. So, it doesn't show the benefit of the reduced network traffic due to the small size of ZSTD. Here, - To get ZSTD 1.4.8-1 result, `lzbench` `master` branch is used because Spark is using ZSTD 1.4.8. - To get LZ4 1.7.5 result, `lzbench` `v1.7` branch is used because Spark is using LZ4 1.7.1. ``` Compressor name Compress. Decompress. Compr. size Ratio Filename memcpy 7393 MB/s 7166 MB/s 1135464691 100.00 spark-a1843ead29834f46b1125a03eca32679 zstd 1.4.8 -1 1344 MB/s 3351 MB/s 56665767 4.99 spark-a1843ead29834f46b1125a03eca32679 lz4 1.7.5 1385 MB/s 4782 MB/s 127662168 11.24 spark-a1843ead29834f46b1125a03eca32679 ``` ### Does this PR introduce _any_ user-facing change? - No for the apps which doesn't use `spark.eventLog.compress` because `spark.eventLog.compress` is disabled by default. - No for the apps using `spark.eventLog.compression.codec` explicitly because this is a change of the default value. - Yes for the apps using `spark.eventLog.compress` without setting `spark.eventLog.compression.codec`. In this case, previously `spark.io.compression.codec` value was used whose default is `lz4`. So this JIRA issue, SPARK-34503, is labeled with `releasenotes`. ### How was this patch tested? Pass the updated UT. Closes #31618 from dongjoon-hyun/SPARK-34503. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/internal/config/package.scala | 5 +++-- .../deploy/history/EventLogFileWritersSuite.scala | 10 ++-------- docs/configuration.md | 5 ++--- docs/core-migration-guide.md | 2 ++ 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 3101bb6632..4a2281a4e8 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1726,9 +1726,10 @@ package object config { ConfigBuilder("spark.eventLog.compression.codec") .doc("The codec used to compress event log. By default, Spark provides four codecs: " + "lz4, lzf, snappy, and zstd. You can also use fully qualified class names to specify " + - "the codec. If this is not given, spark.io.compression.codec will be used.") + "the codec.") .version("3.0.0") - .fallbackConf(IO_COMPRESSION_CODEC) + .stringConf + .createWithDefault("zstd") private[spark] val BUFFER_SIZE = ConfigBuilder("spark.buffer.size") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index e9b739ce7a..e6dd9ae422 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -99,7 +99,7 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon } } - test("spark.eventLog.compression.codec overrides spark.io.compression.codec") { + test("Use the defalut value of spark.eventLog.compression.codec") { val conf = new SparkConf conf.set(EVENT_LOG_COMPRESS, true) val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) @@ -107,14 +107,8 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon val appId = "test" val appAttemptId = None - // The default value is `spark.io.compression.codec`. val writer = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf) - assert(writer.compressionCodecName.contains("lz4")) - - // `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`. - conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd") - val writer2 = createWriter(appId, appAttemptId, testDirPath.toUri, conf, hadoopConf) - assert(writer2.compressionCodecName.contains("zstd")) + assert(writer.compressionCodecName === EVENT_LOG_COMPRESSION_CODEC.defaultValue) } protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): List[String] = { diff --git a/docs/configuration.md b/docs/configuration.md index 612d62a96f..b7b00dd42d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1040,10 +1040,9 @@ Apart from these, the following properties are also available, and may be useful spark.eventLog.compression.codec - + zstd - The codec to compress logged events. If this is not given, - spark.io.compression.codec will be used. + The codec to compress logged events. 3.0.0 diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index ec7c3ab9cb..232b9e31ad 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -24,6 +24,8 @@ license: | ## Upgrading from Core 3.1 to 3.2 +- Since Spark 3.2, `spark.eventLog.compression.codec` is set to `zstd` by default which means Spark will not fallback to use `spark.io.compression.codec` anymore. + - Since Spark 3.2, `spark.storage.replication.proactive` is enabled by default which means Spark tries to replenish in case of the loss of cached RDD block replicas due to executor failures. To restore the behavior before Spark 3.2, you can set `spark.storage.replication.proactive` to `false`. ## Upgrading from Core 3.0 to 3.1