spark-instrumented-optimizer/core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
Kazuyuki Tanimura 9149cad57d [SPARK-32210][CORE] Fix NegativeArraySizeException in MapOutputTracker with large spark.default.parallelism
### What changes were proposed in this pull request?
The current `MapOutputTracker` class may throw `NegativeArraySizeException` with a large number of partitions. Within the serializeOutputStatuses() method, it is trying to compress an array of mapStatuses and outputting the binary data into (Apache)ByteArrayOutputStream . Inside the (Apache)ByteArrayOutputStream.toByteArray(), negative index exception happens because the index is int and overflows (2GB limit) when the output binary size is too large.

This PR proposes two high-level ideas:
  1. Use `org.apache.spark.util.io.ChunkedByteBufferOutputStream`, which has a way to output the underlying buffer as `Array[Array[Byte]]`.
  2. Change the signatures from `Array[Byte]` to `Array[Array[Byte]]` in order to handle over 2GB compressed data.

### Why are the changes needed?
This issue seems to be missed out in the earlier effort of addressing 2GB limitations [SPARK-6235](https://issues.apache.org/jira/browse/SPARK-6235)

Without this fix, `spark.default.parallelism` needs to be kept at the low number. The drawback of setting smaller spark.default.parallelism is that it requires more executor memory (more data per partition). Setting `spark.io.compression.zstd.level` to higher number (default 1) hardly helps.

That essentially means we have the data size limit that for shuffling and does not scale.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Passed existing tests
```
build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite"
```
Also added a new unit test
```
build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite  -- -z SPARK-32210"
```
Ran the benchmark using GitHub Actions and didn't not observe any performance penalties. The results are attached in this PR
```
core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
```

Closes #33721 from kazuyukitanimura/SPARK-32210.

Authored-by: Kazuyuki Tanimura <ktanimura@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 8ee464cd7a)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2021-08-16 09:11:51 -07:00

67 lines
4.2 KiB
Plaintext

OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------
Serialization 143 164 55 1.4 716.5 1.0X
Deserialization 252 300 43 0.8 1262.4 0.6X
Compressed Serialized MapStatus sizes: 412 bytes
Compressed Serialized Broadcast MapStatus sizes: 2 MB
OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 10 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
Serialization 137 139 1 1.5 684.2 1.0X
Deserialization 252 259 13 0.8 1259.5 0.5X
Compressed Serialized MapStatus sizes: 2 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 100 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
Serialization 279 322 116 0.7 1394.6 1.0X
Deserialization 275 287 28 0.7 1372.7 1.0X
Compressed Serialized MapStatus sizes: 427 bytes
Compressed Serialized Broadcast MapStatus sizes: 13 MB
OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 100 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
Serialization 262 263 1 0.8 1310.3 1.0X
Deserialization 274 288 22 0.7 1370.5 1.0X
Compressed Serialized MapStatus sizes: 13 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes
OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 1000 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
Serialization 1208 1208 1 0.2 6038.4 1.0X
Deserialization 555 783 394 0.4 2774.2 2.2X
Compressed Serialized MapStatus sizes: 562 bytes
Compressed Serialized Broadcast MapStatus sizes: 121 MB
OpenJDK 64-Bit Server VM 1.8.0_302-b08 on Linux 5.8.0-1039-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
200000 MapOutputs, 1000 blocks w/o broadcast: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------
Serialization 1097 1097 1 0.2 5484.2 1.0X
Deserialization 554 596 48 0.4 2771.3 2.0X
Compressed Serialized MapStatus sizes: 121 MB
Compressed Serialized Broadcast MapStatus sizes: 0 bytes