[SPARK-24519] Make the threshold for highly compressed map status configurable

**Problem**
MapStatus uses hardcoded value of 2000 partitions to determine if it should use highly compressed map status. We should make it configurable to allow users to more easily tune their jobs with respect to this without having for them to modify their code to change the number of partitions.  Note we can leave this as an internal/undocumented config for now until we have more advise for the users on how to set this config.
Some of my reasoning:
The config gives you a way to easily change something without the user having to change code, redeploy jar, and then run again. You can simply change the config and rerun. It also allows for easier experimentation. Changing the # of partitions has other side affects, whether good or bad is situation dependent. It can be worse are you could be increasing # of output files when you don't want to be, affects the # of tasks needs and thus executors to run in parallel, etc.
There have been various talks about this number at spark summits where people have told customers to increase it to be 2001 partitions. Note if you just do a search for spark 2000 partitions you will fine various things all talking about this number.  This shows that people are modifying their code to take this into account so it seems to me having this configurable would be better.
Once we have more advice for users we could expose this and document information on it.

**What changes were proposed in this pull request?**
I make the hardcoded value mentioned above to be configurable under the name _SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS_, which has default value to be 2000. Users can set it to the value they want by setting the property name _spark.shuffle.minNumPartitionsToHighlyCompress_

**How was this patch tested?**
I wrote a unit test to make sure that the default value is 2000, and  _IllegalArgumentException_ will be thrown if user set it to a non-positive value. The unit test also checks that highly compressed map status is correctly used when the number of partition is greater than _SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS_.

Author: Hieu Huynh <“Hieu.huynh@oath.com”>

Closes #21527 from hthuynh2/spark_branch_1.
This commit is contained in:
Hieu Huynh 2018-06-22 09:16:14 -05:00 committed by Thomas Graves
parent 92c2f00bd2
commit 39dfaf2fd1
3 changed files with 38 additions and 1 deletions

View file

@ -552,4 +552,11 @@ package object config {
.timeConf(TimeUnit.SECONDS) .timeConf(TimeUnit.SECONDS)
.createWithDefaultString("1h") .createWithDefaultString("1h")
private[spark] val SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS =
ConfigBuilder("spark.shuffle.minNumPartitionsToHighlyCompress")
.internal()
.doc("Number of partitions to determine if MapStatus should use HighlyCompressedMapStatus")
.intConf
.checkValue(v => v > 0, "The value should be a positive integer.")
.createWithDefault(2000)
} }

View file

@ -50,7 +50,9 @@ private[spark] sealed trait MapStatus {
private[spark] object MapStatus { private[spark] object MapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > 2000) { if (uncompressedSizes.length > Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
HighlyCompressedMapStatus(loc, uncompressedSizes) HighlyCompressedMapStatus(loc, uncompressedSizes)
} else { } else {
new CompressedMapStatus(loc, uncompressedSizes) new CompressedMapStatus(loc, uncompressedSizes)

View file

@ -188,4 +188,32 @@ class MapStatusSuite extends SparkFunSuite {
assert(count === 3000) assert(count === 3000)
} }
} }
test("SPARK-24519: HighlyCompressedMapStatus has configurable threshold") {
val conf = new SparkConf()
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
SparkEnv.set(env)
val sizes = Array.fill[Long](500)(150L)
// Test default value
val status = MapStatus(null, sizes)
assert(status.isInstanceOf[CompressedMapStatus])
// Test Non-positive values
for (s <- -1 to 0) {
assertThrows[IllegalArgumentException] {
conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
val status = MapStatus(null, sizes)
}
}
// Test positive values
Seq(1, 100, 499, 500, 501).foreach { s =>
conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
val status = MapStatus(null, sizes)
if(sizes.length > s) {
assert(status.isInstanceOf[HighlyCompressedMapStatus])
} else {
assert(status.isInstanceOf[CompressedMapStatus])
}
}
}
} }