[SPARK-29939][CORE] Add spark.shuffle.mapStatus.compression.codec conf

### What changes were proposed in this pull request?

Add a new conf named `spark.shuffle.mapStatus.compression.codec` for user to decide which codec should be used(default by `zstd`) for `MapStatus` compression.

### Why are the changes needed?

We already have this functionality for `broadcast`/`rdd`/`shuffle`/`shuflleSpill`,
so it might be better to have the same functionality for `MapStatus` as well.

### Does this PR introduce any user-facing change?

Yes, user now could use `spark.shuffle.mapStatus.compression.codec` to decide which codec should be used during `MapStatus` compression.
### How was this patch tested?

N/A

Closes #26611 from Ngone51/SPARK-29939.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
wuyi 2019-11-24 21:21:19 -08:00 committed by Dongjoon Hyun
parent 5cf475d288
commit 456cfe6e46
2 changed files with 11 additions and 2 deletions

View file

@ -902,7 +902,7 @@ private[spark] object MapOutputTracker extends Logging {
// the contents don't have to be copied to the new buffer.
val out = new ApacheByteArrayOutputStream()
out.write(DIRECT)
val codec = CompressionCodec.createCodec(conf, "zstd")
val codec = CompressionCodec.createCodec(conf, conf.get(MAP_STATUS_COMPRESSION_CODEC))
val objOut = new ObjectOutputStream(codec.compressedOutputStream(out))
Utils.tryWithSafeFinally {
// Since statuses can be modified in parallel, sync on it
@ -939,7 +939,7 @@ private[spark] object MapOutputTracker extends Logging {
assert (bytes.length > 0)
def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
val codec = CompressionCodec.createCodec(conf, "zstd")
val codec = CompressionCodec.createCodec(conf, conf.get(MAP_STATUS_COMPRESSION_CODEC))
// The ZStd codec is wrapped in a `BufferedInputStream` which avoids overhead excessive
// of JNI call while trying to decompress small amount of data for each element
// of `MapStatuses`

View file

@ -1016,6 +1016,15 @@ package object config {
.booleanConf
.createWithDefault(true)
private[spark] val MAP_STATUS_COMPRESSION_CODEC =
ConfigBuilder("spark.shuffle.mapStatus.compression.codec")
.internal()
.doc("The codec used to compress MapStatus, which is generated by ShuffleMapTask. " +
"By default, Spark provides four codecs: lz4, lzf, snappy, and zstd. You can also " +
"use fully qualified class names to specify the codec.")
.stringConf
.createWithDefault("zstd")
private[spark] val SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD =
ConfigBuilder("spark.shuffle.spill.initialMemoryThreshold")
.internal()